home *** CD-ROM | disk | FTP | other *** search
/ Sprite 1984 - 1993 / Sprite 1984 - 1993.iso / src / cmds / pmake / customs / rpc.c < prev    next >
Encoding:
C/C++ Source or Header  |  1989-11-15  |  80.5 KB  |  2,927 lines

  1. /*-
  2.  * rpc.c --
  3.  *    Remote Procedure Call and timeout mechanism for customs.
  4.  *
  5.  * Copyright (c) 1988, 1989 by the Regents of the University of California
  6.  * Copyright (c) 1988, 1989 by Adam de Boor
  7.  * Copyright (c) 1989 by Berkeley Softworks
  8.  *
  9.  * Permission to use, copy, modify, and distribute this
  10.  * software and its documentation for any non-commercial purpose
  11.  * and without fee is hereby granted, provided that the above copyright
  12.  * notice appears in all copies.  The University of California,
  13.  * Berkeley Softworks and Adam de Boor make no representations about
  14.  * the suitability of this software for any purpose.  It is provided
  15.  * "as is" without express or implied warranty.
  16.  */
  17. #ifndef lint
  18. static char *rcsid =
  19. "$Id: rpc.c,v 1.20 89/11/14 13:46:18 adam Exp $ SPRITE (Berkeley)";
  20. #endif lint
  21.  
  22. /*
  23.  * The RPC mechanism implemented in this file was designed for the customs
  24.  * agent. It supports both udp and tcp transport and requires the complete
  25.  * address of the destination each time (for udp). It also supports broadcast
  26.  * messages as required.
  27.  *
  28.  * A server is created for any <socket, procNum> pair using the function
  29.  * Rpc_ServerCreate(socket, procNum, handleProc, swapArgsProc, swapReplyProc,
  30.  * handleData). The handler function may be called at any time, even while a
  31.  * call is being made on another socket, so handlers should be written
  32.  * accordingly.
  33.  *
  34.  * Two swapping procedures may be provided to byte-swap the call arguments and
  35.  * reply data should the byteorder of the calling host be different from that
  36.  * on the local host. If no data are passed and/or returned, or swapping
  37.  * isn't necessary, Rpc_SwapNull may be given.
  38.  *
  39.  * The system is organized around an event queue and select masks. Events
  40.  * occur at scheduled times with the process sleeping in between events by
  41.  * waiting on a set of streams, as defined by the select masks. Multiple calls
  42.  * may be pending -- each one has an event for resending and replies are
  43.  * handled by the selection mechanism. The event queue is ordered by timeout
  44.  * time. The function Rpc_Wait is what processes the queue and calls the
  45.  * various handlers. Everything here eventually makes its way back to
  46.  * Rpc_Wait.
  47.  *
  48.  * Each stream may have only one function to handle its readiness. Thus a
  49.  * program should not express interest in the stream unless it has unregistered
  50.  * interest in all rpc services on that stream.
  51.  *
  52.  * <remaddr, remport, sock, procnum, message-id> is a 5-tuple that uniquely
  53.  * identifies a given RPC call. This tuple is used for caching calls and
  54.  * responses to them.
  55.  *
  56.  * The protocol relies on the acknowledgement implicit in the return of
  57.  * data. Every service routine must call Rpc_Return() at least once, even
  58.  * if no data are to be returned. The only time an explicit acknowledgement
  59.  * packet is sent is when a duplicate call is received for one that is
  60.  * currently being processed. The acknowledgement indicates that the client
  61.  * is to continue waiting and resending at whatever interval it chooses.
  62.  */
  63.  
  64. #define FD_SETSIZE  256        /* Make sure this is big enough for both */
  65.                 /* Sun and ISI... */
  66.  
  67. #include    <sys/time.h>
  68. #include    <sys/ioctl.h>
  69. #include    <sys/types.h>
  70. #include    <sys/socket.h>
  71. #include    <sys/file.h>
  72. #include    <netinet/in.h>
  73. #include    <net/if.h>
  74. #include    <arpa/inet.h>
  75. #include    <sys/uio.h>
  76. #include    <errno.h>
  77. #include    <stdio.h>
  78. #include    <setjmp.h>
  79. #include    <sys/signal.h>
  80.  
  81. extern int errno;        /* Not all systems define this */
  82.  
  83. #include    "rpc.h"
  84.  
  85.  
  86. #ifndef MAX_DATA_SIZE
  87. #define MAX_DATA_SIZE    2048
  88. #endif  /* MAX_DATA_SIZE */
  89.  
  90. #ifndef MAXNETS
  91. #define MAXNETS            10      /* Maximum number of networks a machine may
  92.                  * be on (Max # Rpc_Broadcast will broadcast
  93.                  * to, anyway) */
  94. #endif /* MAXNETS */
  95. /*
  96.  * Macro for adding two time values together into a third.
  97.  */
  98. #define timeadd(tv1,tv2,tvd) \
  99. /*struct timeval *tv1, *tv2, *tvd;*/ \
  100. {\
  101.     (tvd)->tv_usec = (tv1)->tv_usec + (tv2)->tv_usec;\
  102.     if ((tvd)->tv_usec >= 1000000) {\
  103.     (tvd)->tv_sec = (tv1)->tv_sec + (tv2)->tv_sec + 1;\
  104.     (tvd)->tv_usec -= 1000000;\
  105.     } else {\
  106.     (tvd)->tv_sec = (tv1)->tv_sec + (tv2)->tv_sec;\
  107.     }\
  108. }
  109.  
  110. /*
  111.  * Macro to deal with incompatible calling conventions between gcc and cc on
  112.  * a sparc (gcc passes the address in a register, since the structure is
  113.  * small enough, while cc still passes the address).
  114.  */
  115. #if defined(__GNUC__) && defined(sparc)
  116. #define InetNtoA(addr)    inet_ntoa(&(addr))
  117. #else
  118. #define InetNtoA(addr)    inet_ntoa(addr)
  119. #endif
  120.  
  121. /*
  122.  * The ID of a message is simply an unsigned long that increments for each
  123.  * message.
  124.  */
  125. typedef unsigned long RpcID;
  126. #define RpcIDEqual(id1, id2)    (id1 == id2)
  127. #define RpcHash(id)            (((id) ^ (id >> 3) ^ (id >> 9)) & \
  128.                  (CACHE_THREADS-1))
  129.  
  130. /*
  131.  * RPC CACHE DEFINITIONS
  132.  */
  133. /*
  134.  * The cache is kept as a table of doubly-linked lists, hashed on the ID of the
  135.  * message, hanging from the RpcServer structure for the procedure call. The
  136.  * 'prev' pointer of the first entry in the chain points to the pointer in
  137.  * the table for that chain (i.e. *e->prev == e). Cache entries are flushed 10
  138.  * seconds after their last reference.
  139.  *
  140.  * A cache entry may be in one of two states: replied-to or reply-pending.
  141.  * If a call comes in that maps to an entry for which a reply is still
  142.  * pending, an RPC_ACKNOWLEDGE message is returned, informing the caller that
  143.  * the call is still in progress.
  144.  *
  145.  * If a call comes in that maps to an entry that has been replied-to, the
  146.  * reply is resent and the call dropped.
  147.  */
  148. typedef struct CacheEntry {
  149.     enum {
  150.     REPLY_PENDING,                    /* Service of call is in progress */
  151.     REPLY_SENT,                    /* Reply has been sent already */
  152.     }                  status;        /* Status of call */
  153.     RpcID             id;         /* ID of call */
  154.     struct sockaddr_in    from;        /* Where call came from */
  155.     Rpc_Event          flushEvent; /* Event to flush cache entry */
  156.     Rpc_Stat          error;        /* If not RPC_SUCCESS, contains the
  157.                      * error code returned for the call */
  158.     Rpc_Opaque          replyData;  /* Data for reply */
  159.     int                  replySize;  /* Size of reply data */
  160.     struct CacheEntry    *next;        /* Next call in cache */
  161.     struct CacheEntry    *prev;        /* Previous call in cache or address of
  162.                      * pointer to head of chain if head of
  163.                      * chain */
  164. } CacheEntry;
  165.  
  166. #define CACHE_THREADS    8      /* The number of chains in each server
  167.                  * cache. NOTE: dependence on this value
  168.                  * in RpcHash() -- must be (2^n) */
  169. /*
  170.  * RPC CALL DEFINITIONS
  171.  */
  172. /*
  173.  * The MsgHeader is prepended to each out-going message and is expected to
  174.  * be present on each incoming message on a service stream. All fields
  175.  * larger than a byte are presented in network-byte-order (except the id,
  176.  * which is unique in any byte-order, so long as the byte-order is consistent).
  177.  * Only one of the message type bits may be on at once.
  178.  */
  179. typedef struct {
  180.     RpcID             id;         /* Message id */
  181.     u_long            byteOrder;  /* Magic number indicating the byte order
  182.                      * of the sending host. */
  183.     Rpc_Proc             procNum;    /* Procedure number */
  184.     short             flags;        /* Flags */
  185.                                       /* MESSAGE TYPES: */
  186. #define RPC_CALL      0x0001       /* Message is a call */
  187. #define RPC_REPLY     0x0002       /* Message is a reply */
  188. #define RPC_ERROR     0x0004       /* Message is an error reply. Status
  189.                      * is message data */
  190. #define RPC_ACKNOWLEDGE    0x0008            /* Message is acknowledgement, not
  191.                      * reply, indicating service still
  192.                      * in progress */
  193.                                       /* MODIFIERS: */
  194. #define RPC_BROADCAST    0x0100       /* Message is broadcast, so don't
  195.                      * reply with errors */
  196.     int                  length;        /* Length of following data */
  197. } MsgHeader;
  198.  
  199. #define RpcTypeMask        (RPC_CALL|RPC_REPLY|RPC_ERROR|RPC_ACKNOWLEDGE)
  200. #define RpcIsCall(hdrPtr)   (((hdrPtr)->flags&RpcTypeMask)==RPC_CALL)
  201. #define RpcIsReply(hdrPtr)  (((hdrPtr)->flags&RpcTypeMask)==RPC_REPLY)
  202. #define RpcIsError(hdrPtr)  (((hdrPtr)->flags&RpcTypeMask)==RPC_ERROR)
  203. #define RpcIsAck(hdrPtr)    (((hdrPtr)->flags&RpcTypeMask)==RPC_ACKNOWLEDGE)
  204.  
  205. #define RPC_MAGIC     0x03020100  /* Magic number placed in byteOrder
  206.                      * field of outgoing message to indicate
  207.                      * sending host's byte order */
  208.  
  209. /*
  210.  * The RpcCall structure contains all information needed to track an RPC
  211.  * call and its replies. It is given as the argument for RpcResend when the
  212.  * resend timer expires.
  213.  */
  214. typedef struct RpcCall {
  215.     struct RpcCall    *next;        /* Next in chain for socket */
  216.     RpcID             id;         /* ID number of message */
  217.  
  218.     /*
  219.      * Information for receiving replies
  220.      */
  221.     Rpc_Stat           status;        /* Status of call */
  222.     struct sockaddr_in    remote;        /* Address of responder */
  223.     int                  replyLen;   /* Expected length of reply */
  224.     Rpc_Opaque           reply;        /* Place to store reply data */
  225.     Boolean              replied;    /* Reply received */
  226.  
  227.     /*
  228.      * Information for issuing the call.
  229.      */
  230.     int                  sock;        /* Socket over which to make the call */
  231.     struct msghdr     message;    /* Outgoing message */
  232.     int                  numRetries; /* Number of resends left */
  233.     Rpc_Event       resend;        /* Event used for resending */
  234. } RpcCall;
  235.  
  236. static RpcCall          *rpcCalls[FD_SETSIZE];
  237.  
  238. /*
  239.  * RPC SERVER DEFINITIONS
  240.  */
  241. /*
  242.  * A service is established for a stream by linking a RpcServer structure into
  243.  * its rpcServers list. The handler function is called when a call on the
  244.  * procedure procNum is received over the stream.
  245.  */
  246. typedef struct RpcServer {
  247.     struct RpcServer    *next;
  248.     Rpc_Proc             procNum;            /* Procedure number */
  249.     void              (*serverProc)();        /* Function to handle it */
  250.     void              (*swapArgsProc)();        /* Function to swap args */
  251.     void        (*swapReplyProc)();     /* Function to swap reply */
  252.     Rpc_Opaque           data;                    /* Datum to pass procs */
  253.     CacheEntry          *cache[CACHE_THREADS];    /* Call cache */
  254. } RpcServer;
  255.  
  256. /*
  257.  * The RpcMessage structure contains all the information needed to reply to
  258.  * an rpc call.
  259.  */
  260. typedef struct RpcMessage {
  261.     MsgHeader          *header;        /* Original message header */
  262.     struct sockaddr_in    *remote;        /* Address of caller */
  263.     int                  sock;            /* Socket on which to reply */
  264.     CacheEntry          *e;             /* Entry in server cache to modify */
  265.     RpcServer        *server;    /* Server for message (so we can
  266.                      * swap the reply) */
  267. } RpcMessage;
  268.  
  269. static RpcServer      *rpcServers[FD_SETSIZE];
  270.  
  271. /*
  272.  * WAIT DEFINITIONS
  273.  */
  274. /*
  275.  * RpcEvent structures are what make up the event queue (events) around which
  276.  * this system revolves. Each has a time at which the event should occur and
  277.  * a function to call when the event happens, along with a single piece of
  278.  * data to be passed to the function. The event queue is time-ordered.
  279.  */
  280. typedef struct RpcEvent {
  281.     struct RpcEvent      *next;
  282.     struct timeval    timeout;        /* Time at which event should occur */
  283.     struct timeval    interval;       /* Interval at which event should
  284.                      * recur. */
  285.     Boolean              (*handler)();    /* Function to be called at timeout */
  286.     Rpc_Opaque           data;            /* Datum to pass it */
  287. } RpcEvent;
  288.  
  289. static RpcEvent          *events;        /* All waiting events */
  290.  
  291. /*
  292.  * The 'streams' array contains the information needed to handle the readiness
  293.  * of a stream. The 'state' field is set from the arguments to Rpc_Watch and
  294.  * is used to remove the stream from the select masks when interest in it
  295.  * changes. If 'state' is 0, noone is interested in the stream.
  296.  */
  297. static struct {
  298.     int                  state;             /* Current interest */
  299.     void              (*handler)();    /* Function to handle readiness */
  300.     Rpc_Opaque           data;            /* Datum to pass to handler */
  301. }                   streams[FD_SETSIZE];
  302.  
  303. fd_set                  rpc_readMask;    /* Readable stream select mask */
  304. fd_set            rpc_writeMask;    /* Writeable stream select mask */
  305. fd_set            rpc_exceptMask;    /* Exceptable stream select mask */
  306.  
  307. /*
  308.  * MISCELLANEOUS DEFINITIONS
  309.  */
  310. static Boolean          rpcDebug;       /* Print debugging info */
  311. static void           RpcTcpAccept();    /* Perform ACCEPT on TCP RPC socket */
  312.  
  313. /*
  314.  * Interval for flushing a call entry from the cache
  315.  */
  316. static struct timeval    flushTimeOut = {
  317.     10, 0
  318. };
  319.  
  320. /*-
  321.  *-----------------------------------------------------------------------
  322.  * RpcUniqueID --
  323.  *    Return an unique identifier for a message. Potential clashes
  324.  *    between hosts are reduced by using a random number on startup.
  325.  *
  326.  * Results:
  327.  *    The identifier.
  328.  *
  329.  * Side Effects:
  330.  *    None.
  331.  *
  332.  *-----------------------------------------------------------------------
  333.  */
  334. static RpcID
  335. RpcUniqueID()
  336. {
  337.     static RpcID  nextID = 0;
  338.  
  339.     if (nextID == 0) {
  340.     srandom(time(0) ^ getpid());
  341.     nextID = random();
  342.     } else {
  343.     nextID += 1;
  344.     }
  345.     return (nextID);
  346. }
  347.  
  348. /*-
  349.  *-----------------------------------------------------------------------
  350.  * RpcCacheFlushEntry --
  351.  *    Flush an entry from a server's cache when it hasn't been
  352.  *    referenced in a while.
  353.  *
  354.  * Results:
  355.  *    False -- no need to stay awake.
  356.  *
  357.  * Side Effects:
  358.  *    The CacheEntry structure is removed from the server's cache
  359.  *    and freed.
  360.  *
  361.  *-----------------------------------------------------------------------
  362.  */
  363. static Boolean
  364. RpcCacheFlushEntry(e, ev)
  365.     register CacheEntry    *e;     /* Entry to flush */
  366.     Rpc_Event          ev;     /* Event that caused this call */
  367. {
  368.     if (rpcDebug) {
  369.     printf("Flushing entry for %u\n", e->id);
  370.     }
  371.     
  372.     if ((e->flushEvent != ev) /* && rpcDebug */) {
  373.     printf("RpcCacheFlushEntry: flushEvent (%x) != ev (%x)\n",
  374.            e->flushEvent, ev);
  375.     }
  376.     Rpc_EventDelete(ev);
  377.     if (*((CacheEntry **)e->prev) == e) {
  378.     /*
  379.      * Head of chain: move head to next
  380.      */
  381.     *((CacheEntry **)e->prev) = e->next;
  382.     } else {
  383.     /*
  384.      * Element of chain: link previous to next
  385.      */
  386.     e->prev->next = e->next;
  387.     }
  388.  
  389.     /*
  390.      * Link next element to previous (this also sets up the prev field
  391.      * properly if e was the head of the chain).
  392.      */
  393.     if (e->next != (CacheEntry *)NULL) {
  394.     e->next->prev = e->prev;
  395.     }
  396.     if (e->replyData != (Rpc_Opaque)0) {
  397.     free((char *)e->replyData);
  398.     }
  399.     free((char *)e);
  400.  
  401.     return (False);
  402. }
  403.  
  404. /*-
  405.  *-----------------------------------------------------------------------
  406.  * RpcCacheDestroy --
  407.  *    Clean out the cache for an RpcServer.
  408.  *
  409.  * Results:
  410.  *    None.
  411.  *
  412.  * Side Effects:
  413.  *    Frees all memory and nukes all events associated with the cache
  414.  *    entries.
  415.  *
  416.  *-----------------------------------------------------------------------
  417.  */
  418. static void
  419. RpcCacheDestroy(s)
  420.     RpcServer          *s;    /* The server whose cache should be destroyed */
  421. {
  422.     register int      i;
  423.     register CacheEntry    *e;
  424.     
  425.     for (i = 0; i < CACHE_THREADS; i++) {
  426.     for (e = s->cache[i]; e != (CacheEntry *)0; e = e->next) {
  427.         if (e->replyData != (Rpc_Opaque)0) {
  428.         free((char *)e->replyData);
  429.         }
  430.         Rpc_EventDelete(e->flushEvent);
  431.         free((char *)e);
  432.     }
  433.     }
  434. }
  435.  
  436. /*-
  437.  *-----------------------------------------------------------------------
  438.  * RpcCacheFind --
  439.  *    Find an RPC call in a server's cache. If it's not there and
  440.  *    create is True, the entry is created and *entryNewPtr is set
  441.  *    True. If the entry cannot be created, or doesn't exist and
  442.  *    create is False, NULL is returned.
  443.  *
  444.  * Results:
  445.  *    The CacheEntry for the call, or NULL if none.
  446.  *
  447.  * Side Effects:
  448.  *    A CacheEntry structure is created and linked into the cache
  449.  *    for the server.
  450.  *
  451.  *-----------------------------------------------------------------------
  452.  */
  453. static CacheEntry *
  454. RpcCacheFind(server, from, id, create, entryNewPtr)
  455.     RpcServer          *server;        /* Server in whose cache the call
  456.                      * should be sought */
  457.     register struct sockaddr_in    *from;     /* Origin of the call */
  458.     RpcID             id;             /* ID number of the call */
  459.     Boolean           create;            /* True if should create an entry
  460.                      * if we don't find it */
  461.     Boolean           *entryNewPtr;    /* Set True if a new entry was
  462.                      * created */
  463. {
  464.     register CacheEntry    *e;
  465.     register int      chain;
  466.  
  467.     chain = RpcHash(id);
  468.     e = server->cache[chain];
  469.  
  470.     /*
  471.      * Look for existing cache entry.
  472.      */
  473.     if (rpcDebug) {
  474.     printf("RpcCacheFind: seeking %d@%s #%u...",
  475.            ntohs(from->sin_port),
  476.            InetNtoA(from->sin_addr),
  477.            id);
  478.     }
  479.     while (e != (CacheEntry *)0) {
  480.     if ((e->from.sin_addr.s_addr == from->sin_addr.s_addr) &&
  481.         (e->from.sin_port == from->sin_port) &&
  482.         (RpcIDEqual(id, e->id))) {
  483.         break;
  484.     } else {
  485.         e = e->next;
  486.     }
  487.     }
  488.  
  489.     if (e == (CacheEntry *)NULL) {
  490.     if (create) {
  491.         /*
  492.          * Create new entry and link it at the head of its chain,
  493.          * setting *entryNewPtr as necessary.
  494.          */
  495.         if (rpcDebug) {
  496.         printf("creating new entry\n");
  497.         }
  498.         e = (CacheEntry *)malloc(sizeof(CacheEntry));
  499.         e->id =         id;
  500.         e->from =        *from;
  501.         e->status =     REPLY_PENDING;
  502.         e->flushEvent = Rpc_EventCreate(&flushTimeOut,
  503.                         RpcCacheFlushEntry,
  504.                         (Rpc_Opaque)e);
  505.         e->error =        RPC_SUCCESS;
  506.         e->replyData =  (Rpc_Opaque)0;
  507.         e->replySize =  0;
  508.  
  509.         e->next =        server->cache[chain];
  510.         e->prev =        (CacheEntry *)&server->cache[chain];
  511.         server->cache[chain] = e;
  512.         if (e->next != (CacheEntry *)NULL) {
  513.         e->next->prev = e;
  514.         }
  515.  
  516.         if (entryNewPtr != (Boolean *)NULL) {
  517.         *entryNewPtr = True;
  518.         }
  519.     } else {
  520.         if (rpcDebug) {
  521.         printf("returning NULL\n");
  522.         }
  523.         return ((CacheEntry *)NULL);
  524.     }
  525.     } else if (entryNewPtr != (Boolean *)NULL) {
  526.     /*
  527.      * No new entry created -- mark *entryNewPtr false to indicate this
  528.      */
  529.     if (rpcDebug) {
  530.         printf("found it\n");
  531.     }
  532.     *entryNewPtr = False;
  533.     }
  534.     
  535.     /*
  536.      * The entry was referenced, so reset the flush timer for it. Check
  537.      * for null because Rpc_Broadcast uses a cache and biffs the flush
  538.      * events for each entry in the cache.
  539.      */
  540.     if (e->flushEvent) {
  541.     Rpc_EventReset(e->flushEvent, &flushTimeOut);
  542.     }
  543.     return (e);
  544. }
  545.  
  546. /*-
  547.  *-----------------------------------------------------------------------
  548.  * RpcCheckStreams --
  549.  *    Check the set of watched streams for bad ones and remove them from
  550.  *    the set. Called by Rpc_Wait when an EBADF error is returned from
  551.  *    select().
  552.  *
  553.  * Results:
  554.  *    None.
  555.  *
  556.  * Side Effects:
  557.  *    The state field of any bad stream is set to 0 and the stream removed
  558.  *    from all bit masks.
  559.  *
  560.  *-----------------------------------------------------------------------
  561.  */
  562. static void
  563. RpcCheckStreams()
  564. {
  565.     register int      stream;
  566.     register RpcCall     *call;
  567.  
  568.     /*
  569.      * For each stream that someone is interested in, perform an innocuous
  570.      * lseek on it just to see if the descriptor itself is valid (if the
  571.      * stream is unseekable, we'll get an EINVAL error). If the descriptor
  572.      * is bad, clear it out of all the select masks and set its state to 0.
  573.      */
  574.     for (stream=0; stream < FD_SETSIZE; stream++){
  575.     if (streams[stream].state) {
  576.         errno = 0;
  577.         if ((lseek(stream, 0, L_INCR) < 0) &&
  578.         (errno == EBADF))
  579.         {
  580.         if (rpcDebug) {
  581.             printf("%d bad\n", stream);
  582.             fflush(stdout);
  583.         }
  584.         FD_CLR(stream, &rpc_readMask);
  585.         FD_CLR(stream, &rpc_writeMask);
  586.         FD_CLR(stream, &rpc_exceptMask);
  587.         streams[stream].state = 0;
  588.  
  589.         /*
  590.          * Mark all the calls on this stream as failed.
  591.          *
  592.          * XXX: What about servers?
  593.          */
  594.         for (call = rpcCalls[stream];
  595.              call != (RpcCall *)NULL;
  596.              call = call->next)
  597.         {
  598.             call->replied = True;
  599.             call->status = RPC_CANTSEND;
  600.         }
  601.         }
  602.     }
  603.     }
  604. }
  605.  
  606. /*-
  607.  *-----------------------------------------------------------------------
  608.  * RpcHandleStream --
  609.  *    Handle incoming data on an rpc stream, be it a call or a reply.
  610.  *
  611.  * Results:
  612.  *    None.
  613.  *
  614.  * Side Effects:
  615.  *    If it is a call, the appropriate server function is called. If
  616.  *    it is a reply, the replied, remote and status fields of the
  617.  *    RpcCall structure for the call are altered and the RpcCall
  618.  *    structure removed from the list of calls for the socket.
  619.  *
  620.  *-----------------------------------------------------------------------
  621.  */
  622. /*ARGSUSED*/
  623. static void
  624. RpcHandleStream(stream, data, what)
  625.     int                  stream;       /* Stream that's ready */
  626.     Rpc_Opaque           data;            /* Data we stored (UNUSED) */
  627.     int                  what;            /* What it's ready for (UNUSED) */
  628. {
  629.     struct sockaddr_in    remote;            /* Address of sender */
  630.     int                  remotelen;      /* size of 'remote' (for recvfrom) */
  631.     struct {
  632.     MsgHeader         header;
  633.     unsigned char        buf[MAX_DATA_SIZE];
  634.     }                  message;        /* Place for the message coming in */
  635.     int                  msgLen;            /* Length of actual message */
  636.  
  637.     /*
  638.      * Keep trying to read the message as long as the recvfrom call is
  639.      * interrupted.
  640.      */
  641.     do {
  642.     remote.sin_addr.s_addr = 0;
  643.     remotelen = sizeof(remote);
  644.     msgLen = recvfrom(stream, (char *)&message, sizeof(message), 0,
  645.               (struct sockaddr *)&remote, &remotelen);
  646.     } while ((msgLen < 0) && (errno == EINTR));
  647.     
  648.     /*
  649.      * recvfrom may not actually return the address for a connected tcp stream.
  650.      * Since we need the beastie, we must query the system by hand in such a
  651.      * case.
  652.      */
  653.     if ((msgLen > 0) && (remote.sin_addr.s_addr == 0)) {
  654.     remotelen = sizeof(remote);
  655.     if ((getpeername(stream, (struct sockaddr *)&remote,
  656.              &remotelen) < 0) ||
  657.         (remote.sin_addr.s_addr == 0))
  658.     {
  659.         if (rpcDebug) {
  660.         printf ("Could not get address of caller\n");
  661.         }
  662.         return;
  663.     }
  664.     }
  665.     
  666.     if (msgLen < (int)sizeof(MsgHeader)) {
  667.     if (msgLen < 0) {
  668.         if (errno == ENOTCONN) {
  669.         /*
  670.          * The stream must be a passive TCP socket. Accept on it
  671.          * to create a new rpc socket...
  672.          */
  673.         RpcTcpAccept(stream);
  674.         } else {
  675.         perror("recvfrom");
  676.         }
  677.     } else if (msgLen == 0) {
  678.         /*
  679.          * If we received an end-of-file, we assume the service is at
  680.          * an end and close the thing down...Any pending calls are
  681.          * marked timed out, since we won't be able to receive any
  682.          * replies on this socket.
  683.          */
  684.         register RpcServer    *s;
  685.         register RpcCall    *c;
  686.  
  687.         if (rpcDebug) {
  688.         printf("EOF on %d -- closing\n", stream);
  689.         }
  690.  
  691.         for (s = rpcServers[stream];
  692.          s != (RpcServer *)0;
  693.          s = rpcServers[stream]) {
  694.              RpcCacheDestroy(s);
  695.              rpcServers[stream] = s->next;
  696.              free((char *)s);
  697.         }
  698.         for (c = rpcCalls[stream];
  699.          c != (RpcCall *)0;
  700.          c = rpcCalls[stream]) {
  701.              c->replied = True;
  702.              c->status = RPC_TIMEDOUT;
  703.              rpcCalls[stream] = c->next;
  704.         }
  705.         
  706.         Rpc_Ignore(stream);
  707.         (void) close(stream);
  708.     } else if(rpcDebug) {
  709.         printf("Incomplete header received (%d bytes)\n", msgLen);
  710.     }
  711.     return;
  712.     }
  713.  
  714.     /*
  715.      * Message received. Byte swap the header to match the local machine's
  716.      * byte-order.
  717.      */
  718.     message.header.id =         ntohl(message.header.id);
  719.     message.header.procNum =    ntohs(message.header.procNum);
  720.     message.header.flags =    ntohs(message.header.flags);
  721.     message.header.length =    ntohl(message.header.length);
  722.  
  723.     if (msgLen - (int)sizeof(MsgHeader) < message.header.length) {
  724.     if (rpcDebug) {
  725.         printf ("Incomplete message received: was %d, s/b %d\n",
  726.             msgLen - sizeof(MsgHeader),
  727.             message.header.length);
  728.     }
  729.     return;
  730.     }
  731.  
  732.     if (RpcIsCall(&message.header)) {
  733.     /*
  734.      * Find server and call out.
  735.      */
  736.     register RpcServer  *server;
  737.     RpcMessage        msg;
  738.     CacheEntry        *e;
  739.     Boolean              isNew;
  740.  
  741.     msg.header = &message.header;
  742.     msg.remote = &remote;
  743.     msg.sock   = stream;
  744.     msg.e       = (CacheEntry *)NULL;
  745.     
  746.     if (rpcDebug) {
  747.         printf("Call on %d: id %u procedure %d\n",
  748.            stream,
  749.            message.header.id,
  750.            message.header.procNum);
  751.     }
  752.     
  753.     for (server = rpcServers[stream];
  754.          server != (RpcServer *)0;
  755.          server = server->next)
  756.     {
  757.         if (server->procNum == message.header.procNum) {
  758.         break;
  759.         }
  760.     }
  761.     if (server == (RpcServer *)0) {
  762.         if (rpcDebug) {
  763.         printf("No such procedure\n");
  764.         }
  765.         Rpc_Error((Rpc_Message)&msg, RPC_NOPROC);
  766.         return;
  767.     }
  768.         
  769.     /*
  770.      * XXX: Should do more error checking (e.g. does length match?)
  771.      */
  772.     e = RpcCacheFind(server, &remote, message.header.id, True, &isNew);
  773.     msg.e = e;
  774.     
  775.     if (isNew) {
  776.         msg.server = server;
  777.         /*
  778.          * Check their byte-order against ours and call the swap procedure
  779.          * if it's not the same.
  780.          */
  781.         if ((message.header.byteOrder != RPC_MAGIC) &&
  782.         (server->swapArgsProc != Rpc_SwapNull))
  783.         {
  784.         (* server->swapArgsProc) (message.header.length,
  785.                       (Rpc_Opaque)message.buf,
  786.                       server->data);
  787.         }
  788.  
  789.         (* server->serverProc) (&remote, (Rpc_Message)&msg,
  790.                     message.header.length,
  791.                     (Rpc_Opaque)message.buf);
  792.  
  793.         /*
  794.          * Make sure the server generated some reply. If not, generate
  795.          * a null-reply. Note we don't do this for broadcast messages
  796.          * as they only get explicit replies -- the server may just have
  797.          * decided the message wasn't really for it. Broadcasts are
  798.          * rather general, after all.
  799.          */
  800.         if ((e->status != REPLY_SENT) &&
  801.         ((message.header.flags & RPC_BROADCAST) == 0))
  802.         {
  803.         printf("No reply sent for call %u to procedure %d on %d\n",
  804.                message.header.id,
  805.                message.header.procNum,
  806.                stream);
  807.         printf("Generating zero-length reply...\n");
  808.         Rpc_Return((Rpc_Message)&msg, 0, (Rpc_Opaque)NULL);
  809.         }
  810.     } else {
  811.         if (e->status == REPLY_PENDING) {
  812.         /*
  813.          * This call is already being serviced. Return an explicit
  814.          * acknowledgement to the sender to let it know we're still
  815.          * alive and working, then drop the request. This is *not*
  816.          * done for broadcast requests -- if a reply is needed,
  817.          * it is done explicitly or not at all.
  818.          */
  819.         register MsgHeader  *header;
  820.         int                numBytes;
  821.  
  822.         if (rpcDebug) {
  823.             printf("call cached: sending ACK\n");
  824.         }
  825.         header = &message.header;
  826.         if ((header->flags & RPC_BROADCAST) == 0) {
  827.             header->procNum = htons(header->procNum);
  828.             header->flags ^= (RPC_ACKNOWLEDGE|RPC_CALL);
  829.             header->flags = htons(header->flags);
  830.             header->length = 0;
  831.             
  832.             do {
  833.             numBytes = sendto(stream,
  834.                       (char *)header, sizeof(MsgHeader),
  835.                       0,
  836.                       (struct sockaddr *)&remote,
  837.                       sizeof(remote));
  838.             } while ((numBytes < 0) && (errno == EINTR));
  839.         }
  840.         } else {
  841.         /*
  842.          * A reply has already been sent for this call. We use
  843.          * Rpc_Return to return the data already sent. Note
  844.          * that Rpc_Return can tell when it's a resend by the status
  845.          * being REPLY_SENT...Again, the request is dropped.
  846.          */
  847.         if (e->error == RPC_SUCCESS) {
  848.             if(rpcDebug) {
  849.             printf("%d byte reply cached: resending\n",
  850.                    e->replySize);
  851.             }
  852.             Rpc_Return((Rpc_Message)&msg, e->replySize, e->replyData);
  853.         } else {
  854.             if (rpcDebug) {
  855.             printf("error %d cached: resending\n", e->error);
  856.             }
  857.             Rpc_Error((Rpc_Message)&msg, e->error);
  858.         }
  859.         }
  860.     }
  861.     } else {
  862.     /*
  863.      * It's a reply of some sort. Find the associated call...
  864.      */
  865.     register RpcCall    *call;
  866.     register RpcCall    **prev;
  867.  
  868.     if (rpcDebug) {
  869.         printf ("Reply to %u: ", message.header.id);
  870.     }
  871.     
  872.     prev = &rpcCalls[stream];
  873.     for (call = rpcCalls[stream]; call != (RpcCall *)0; call = call->next){
  874.         if (RpcIDEqual(message.header.id, call->id)) {
  875.         break;
  876.         } else {
  877.         prev = &call->next;
  878.         }
  879.     }
  880.     if (call != (RpcCall *)0) {
  881.         switch (message.header.flags & RpcTypeMask) {
  882.         case RPC_REPLY:
  883.             /*
  884.              * The message is a real reply.
  885.              *
  886.              * If the returned data are too big for the buffer the
  887.              * caller passed, drop the packet and signal an RPC_TOOBIG
  888.              * error.
  889.              *
  890.              * Else, copy the returned data to the buffer supplied by
  891.              * the caller and mark the call as successful.
  892.              */
  893.             if (message.header.length > call->replyLen) {
  894.             call->status = RPC_TOOBIG;
  895.             if (rpcDebug) {
  896.                 printf("too big\n");
  897.             }
  898.             } else {
  899.             if (message.header.length != 0) {
  900.                 bcopy ((char *)message.buf,
  901.                    (char *)call->reply,
  902.                    message.header.length);
  903.             }
  904.             call->replyLen = message.header.length;
  905.             if (rpcDebug) {
  906.                 printf ("%d bytes received\n",
  907.                     message.header.length);
  908.             }
  909.             call->status = RPC_SUCCESS;
  910.             }
  911.             break;
  912.         case RPC_ERROR:
  913.             /*
  914.              * The message is an error reply. The data for the message
  915.              * are the return status for the call -- in network byte
  916.              * order -- and we copy it directly into call->status.
  917.              */
  918.             call->status = *(Rpc_Stat *)message.buf;
  919.             call->status = ntohl(call->status);
  920.             if (rpcDebug) {
  921.             printf ("error %d\n", call->status);
  922.             }
  923.             break;
  924.         case RPC_ACKNOWLEDGE:
  925.             /*
  926.              * Server is acknowledging our call. Up the number of
  927.              * retries allowed for the call once for each
  928.              * acknowledegment received. This effectively forgets
  929.              * we ever resent the request.
  930.              */
  931.             if (rpcDebug) {
  932.             printf ("ACK\n");
  933.             }
  934.             call->numRetries += 1;
  935.             return;
  936.         default:
  937.             if (rpcDebug) {
  938.             printf("bogus message received on %d\n", stream);
  939.             }
  940.             return;
  941.         }
  942.         /*
  943.          * Cleanup: If the message got a real reply, (RPC_REPLY or
  944.          * RPC_ERROR), we'll get down here. We first remove the call
  945.          * from the list of those pending, then mark the message as
  946.          * replied-to and save the remote address.
  947.          */
  948.         *prev = call->next;
  949.         call->replied = True;
  950.         call->remote = remote;
  951.     } else {
  952.         if (rpcDebug) {
  953.         printf("no such message queued\n");
  954.         }
  955.     }
  956.     }
  957. }
  958.  
  959. /*-
  960.  *-----------------------------------------------------------------------
  961.  * Rpc_MessageSocket --
  962.  *    Return the socket used to receive the given Rpc_Message.
  963.  *
  964.  * Results:
  965.  *    The above-mentioned socket.
  966.  *
  967.  * Side Effects:
  968.  *    None.
  969.  *
  970.  *-----------------------------------------------------------------------
  971.  */
  972. int
  973. Rpc_MessageSocket(msg)
  974.     Rpc_Message          msg;        /* Received message */
  975. {
  976.     return(((RpcMessage *)msg)->sock);
  977. }
  978.  
  979. /*-
  980.  *-----------------------------------------------------------------------
  981.  * RpcResend --
  982.  *    (Re)Send a call to an rpc server. If numRetries is 0, aborts the
  983.  *    call with an RPC_TIMEDOUT error.
  984.  *
  985.  * Results:
  986.  *    None.
  987.  *
  988.  * Side Effects:
  989.  *    The replied and status fields of the RpcCall structure for the
  990.  *    call may be altered and the RpcCall structure removed from the
  991.  *    list of pending calls.
  992.  *
  993.  *-----------------------------------------------------------------------
  994.  */
  995. static Boolean
  996. RpcResend(call)
  997.     RpcCall           *call;      /* Record for message to be sent */
  998. {
  999.     register RpcCall    *c;         /* Current call in list */
  1000.     register RpcCall    **prev;        /* Pointer to next field of previous call*/
  1001.     int                  numBytes;   /* Number of bytes in message */
  1002.     Rpc_Stat          status;        /* Status of call */
  1003.     MsgHeader          *header;    /* Header of message being resent (for
  1004.                      * debug output) */
  1005.     
  1006.     header = (MsgHeader *)call->message.msg_iov[0].iov_base;
  1007.     
  1008.     if (!call->replied) {
  1009.     if (rpcDebug) {
  1010.         printf("Resending %u: ", ntohl(header->id));
  1011.     }
  1012.     if (call->numRetries != 0) {
  1013.         call->numRetries -= 1;
  1014.         if (rpcDebug) {
  1015.         printf("%d left\n", call->numRetries);
  1016.         }
  1017.     send_again:
  1018.         do {
  1019.         numBytes = sendmsg (call->sock, &call->message, 0);
  1020.         } while ((numBytes < 0) && (errno == EINTR));
  1021.  
  1022.         if (numBytes < 0) {
  1023.         if (rpcDebug) {
  1024.             perror("RpcResend");
  1025.         }
  1026.         switch(errno) {
  1027.             case EMSGSIZE:
  1028.             status = RPC_TOOBIG;
  1029.             break;
  1030.             case ENOTCONN:
  1031.             /*
  1032.              * Socket is a disconnected TCP socket. Connect it to
  1033.              * the server to which this call is directed. XXX: This
  1034.              * connection is irreversible. If the connection
  1035.              * succeeds, resend the message.
  1036.              */
  1037.             if (connect(call->sock,
  1038.                     (struct sockaddr *)call->message.msg_name,
  1039.                     call->message.msg_namelen) == 0)
  1040.             {
  1041.                 goto send_again;
  1042.             } else {
  1043.                 if (rpcDebug) {
  1044.                 perror("connect");
  1045.                 }
  1046.             }
  1047.             /*FALLTHRU*/
  1048.             default:
  1049.             status = RPC_CANTSEND;
  1050.             break;
  1051.         }
  1052.         } else {
  1053.         status = RPC_SUCCESS;
  1054.         }
  1055.     } else {
  1056.         if (rpcDebug) {
  1057.         printf("TIMEOUT\n");
  1058.         }
  1059.         status = RPC_TIMEDOUT;
  1060.     }
  1061.     if (status != RPC_SUCCESS) {
  1062.         /*
  1063.          * If the resend was unsuccessful, mark the call as replied-to and
  1064.          * install the status as the response. Then remove the call from
  1065.          * the list of calls pending for the socket and tell Rpc_Wait not
  1066.          * to go to sleep so the message sender can be woken up as soon as
  1067.          * possible
  1068.          */
  1069.         call->replied = True;
  1070.         call->status = status;
  1071.         
  1072.         prev = &rpcCalls[call->sock];
  1073.         for (c = rpcCalls[call->sock]; c != (RpcCall *)0; c = c->next) {
  1074.         if (c == call) {
  1075.             break;
  1076.         } else {
  1077.             prev = &c->next;
  1078.         }
  1079.         }
  1080.         if (c != (RpcCall *)0) {
  1081.         *prev = c->next;
  1082.         }
  1083.         return (True);
  1084.     } else {
  1085.         /*
  1086.          * Tell Rpc_Wait it's ok to go to sleep, if it wants to. Nothing
  1087.          * interesting will happen for this call until a response comes
  1088.          * back.
  1089.          */
  1090.         return (False);
  1091.     }
  1092.     } else {
  1093.     /*
  1094.      * If the message has already been replied-to, we don't want to
  1095.      * go to sleep. Rather, Rpc_Wait should return to its caller so
  1096.      * the message may be processed as quickly as possible.
  1097.      */
  1098.     if(rpcDebug) {
  1099.         printf("Resend on replied-to message %u\n", ntohl(header->id));
  1100.     }
  1101.     return(True);
  1102.     }
  1103. }
  1104.  
  1105. /*-
  1106.  *-----------------------------------------------------------------------
  1107.  * RpcQueueEvent --
  1108.  *    Place an RpcEvent on the event queue in time-order.
  1109.  *
  1110.  * Results:
  1111.  *    None.
  1112.  *
  1113.  * Side Effects:
  1114.  *    The events list is altered to contain the given event.
  1115.  *
  1116.  *-----------------------------------------------------------------------
  1117.  */
  1118. static void
  1119. RpcQueueEvent(ev)
  1120.     register RpcEvent    *ev;
  1121. {
  1122.     register RpcEvent    *e;
  1123.     register RpcEvent    **prev;
  1124.     
  1125.     if (rpcDebug) {
  1126.     printf ("Queueing event %x (timeout = %d)\n", ev, ev->timeout);
  1127.     }
  1128.     prev = &events;
  1129.     for (e = *prev; e != (RpcEvent *)0; e = *prev) {
  1130.     if (timercmp(&ev->timeout, &e->timeout, <)) {
  1131.         break;
  1132.     } else {
  1133.         prev = &e->next;
  1134.     }
  1135.     }
  1136.     ev->next = e;
  1137.     *prev = ev;
  1138. }
  1139.  
  1140. /*-
  1141.  *-----------------------------------------------------------------------
  1142.  * Rpc_EventCreate --
  1143.  *    Create an event and place it on the queue of events.
  1144.  *
  1145.  * Results:
  1146.  *    An opaque whatsit to be used for deleting the event, if necessary.
  1147.  *
  1148.  * Side Effects:
  1149.  *    An RpcEvent structure is created and placed on the queue.
  1150.  *
  1151.  *-----------------------------------------------------------------------
  1152.  */
  1153. Rpc_Event
  1154. Rpc_EventCreate(interval, handler, data)
  1155.     struct timeval    *interval;      /* Timeout period for event */
  1156.     Boolean              (*handler)();    /* Function to handle timeout */
  1157.     Rpc_Opaque           data;            /* Datum to pass it */
  1158. {
  1159.     register RpcEvent    *ev;
  1160.  
  1161.     ev = (RpcEvent *)malloc(sizeof(RpcEvent));
  1162.     (void)gettimeofday(&ev->timeout, (struct timezone *)0);
  1163.     timeadd(&ev->timeout,interval,&ev->timeout);
  1164.     ev->interval = *interval;
  1165.     ev->handler = handler;
  1166.     ev->data = data;
  1167.  
  1168.     if (rpcDebug) {
  1169.     printf("Created event %x\n", ev);
  1170.     }
  1171.     RpcQueueEvent(ev);
  1172.     return((Rpc_Event)ev);
  1173. }
  1174.  
  1175. /*-
  1176.  *-----------------------------------------------------------------------
  1177.  * Rpc_EventDelete --
  1178.  *    Remove an event from the event queue.
  1179.  *
  1180.  * Results:
  1181.  *    None.
  1182.  *
  1183.  * Side Effects:
  1184.  *    The given event is removed from the event queue.
  1185.  *
  1186.  *-----------------------------------------------------------------------
  1187.  */
  1188. void
  1189. Rpc_EventDelete(event)
  1190.     Rpc_Event          event;    /* Event to remove */
  1191. {
  1192.     register RpcEvent    *ev;
  1193.     register RpcEvent    *e;
  1194.     register RpcEvent    **prev;
  1195.     register int      caller;
  1196.  
  1197.     if (rpcDebug) {
  1198.     printf("Deleting event %x...", event);
  1199.     }
  1200.     ev = (RpcEvent *)event;
  1201.     prev = &events;
  1202.     for (e = *prev; e != (RpcEvent *)0; e = *prev) {
  1203.     if (e == ev) {
  1204.         break;
  1205.     } else {
  1206.         prev = &e->next;
  1207.     }
  1208.     }
  1209.     if (e != (RpcEvent *)0) {
  1210.     *prev = e->next;
  1211.     if (rpcDebug) {
  1212.         printf("\n");
  1213.     }
  1214.     bzero(e, sizeof(*e));
  1215.     free((char *)e);
  1216.     } else if (rpcDebug) {
  1217.     printf("non-existent\n");
  1218. #ifdef notdef
  1219.     } else {
  1220.     asm("movl a6@(4),d7");
  1221.     printf("0x%x deleting non-existent event %x\n",
  1222.            caller, event);
  1223.     for (e = events; e != (RpcEvent *)0; e = e->next) {
  1224.         printf("%x expires at %d.%06d (handler=0x%x)\n",
  1225.            e,
  1226.            e->timeout.tv_sec,
  1227.            e->timeout.tv_usec,
  1228.            e->handler);
  1229.     }
  1230. #endif notdef
  1231.     }
  1232. }
  1233.  
  1234. /*-
  1235.  *-----------------------------------------------------------------------
  1236.  * Rpc_EventReset --
  1237.  *    Reset the time of an existing event. Event is moved to interval
  1238.  *    seconds from now.
  1239.  *
  1240.  * Results:
  1241.  *    None.
  1242.  *
  1243.  * Side Effects:
  1244.  *    The event is moved in the event queue.
  1245.  *
  1246.  *-----------------------------------------------------------------------
  1247.  */
  1248. void
  1249. Rpc_EventReset(event, interval)
  1250.     Rpc_Event          event;        /* Event to alter */
  1251.     struct timeval    *interval;  /* New interval */
  1252. {
  1253.     register RpcEvent    *ev;
  1254.     register RpcEvent    *e;
  1255.     register RpcEvent    **prev;
  1256.     struct timeval    now;
  1257.  
  1258.     if (rpcDebug) {
  1259.     printf("Reseting event %x...", event);
  1260.     }
  1261.     ev = (RpcEvent *)event;
  1262.     prev = &events;
  1263.  
  1264.     for (e = events; e != (RpcEvent *)0; e = e->next) {
  1265.     if (e == ev) {
  1266.         break;
  1267.     } else {
  1268.         prev = &e->next;
  1269.     }
  1270.     }
  1271.  
  1272.     if (e != (RpcEvent *)0) {
  1273.     if (rpcDebug) {
  1274.         printf("\n");
  1275.     }
  1276.     *prev = e->next;
  1277.     } else if (rpcDebug) {
  1278.     printf("nonexistent\n");
  1279.     }
  1280.     ev->interval = *interval;
  1281.     (void)gettimeofday(&now, (struct timezone *)0);
  1282.     timeadd(&now, &ev->interval, &ev->timeout);
  1283.     RpcQueueEvent(ev);
  1284. }
  1285.  
  1286. /*-
  1287.  *-----------------------------------------------------------------------
  1288.  * Rpc_Wait --
  1289.  *    Wait for something to happen -- either an event to timeout or a
  1290.  *    stream to become ready. Call all appropriate handler functions
  1291.  *    and return.
  1292.  *
  1293.  * Results:
  1294.  *    None.
  1295.  *
  1296.  * Side Effects:
  1297.  *    Events may be removed from the event queue.
  1298.  *
  1299.  *-----------------------------------------------------------------------
  1300.  */
  1301. void
  1302. Rpc_Wait()
  1303. {
  1304.     struct timeval    now;            /* Current time */
  1305.     struct timeval    tv;             /* Actual interval to wait */
  1306.     struct timeval    *timeout;       /* Pointer to interval to wait */
  1307.     register RpcEvent    *ev;            /* Current event */
  1308.     Boolean           stayAwake;      /* True if shouldn't go to sleep */
  1309.     fd_set              readMask,
  1310.             writeMask,
  1311.             exceptMask;
  1312.     int            nstreams;
  1313.  
  1314.     if (rpcDebug) {
  1315.     printf("Rpc_Wait:\n");
  1316.     }
  1317.     while (1) {
  1318.     stayAwake = False;
  1319.     timeout = (struct timeval *)0;
  1320.  
  1321.     /*
  1322.      * First handle any timeout event whose time has passed...We have
  1323.      * to get the current time each time through because one of the event
  1324.      * routines could have recursed and taken a long time. In such a case,
  1325.      * when we get back to this level, we'll go to sleep for a lot longer
  1326.      * than we really want. It's only 100 usecs per gettimeofday call,
  1327.      * anyway...
  1328.      */
  1329.     for (ev = events, (void)gettimeofday(&now, (struct timezone *)0);
  1330.          ev != (RpcEvent *)0 && timercmp(&now, &ev->timeout, >);
  1331.          ev = events, (void)gettimeofday(&now, (struct timezone *)0))
  1332.     {
  1333.         events = ev->next;
  1334.         /*
  1335.          * Set the time for the event's recurrence. In the past
  1336.          * this was the interval added to the timeout. This
  1337.          * can cause the process to become swamped, however,
  1338.          * so now add it to the current time so even if the
  1339.          * event is taken late, it will be taken again after the
  1340.          * given delay.
  1341.          */
  1342.         timeadd(&ev->interval,&now,&ev->timeout);
  1343.         RpcQueueEvent(ev);
  1344.         if (rpcDebug) {
  1345.         printf("\ttaking event %x\n", ev);
  1346.         }
  1347.         stayAwake = (*ev->handler) (ev->data, ev) || stayAwake;
  1348.     }
  1349.     if (stayAwake) {
  1350.         /*
  1351.          * If we're not to go to sleep, return to the caller.
  1352.          */
  1353.         if (rpcDebug) {
  1354.         printf("\tstaying awake\n");
  1355.         }
  1356.         return;
  1357.     }
  1358.     if (ev != (RpcEvent *)0) {
  1359.         /*
  1360.          * There's still an event pending, so figure out the time to its
  1361.          * expiration and point 'timeout' at it.
  1362.          */
  1363.         tv.tv_usec = ev->timeout.tv_usec - now.tv_usec;
  1364.         if (tv.tv_usec < 0) {
  1365.         tv.tv_usec += 1000000;
  1366.         tv.tv_sec = ev->timeout.tv_sec - now.tv_sec - 1;
  1367.         } else {
  1368.         tv.tv_sec = ev->timeout.tv_sec - now.tv_sec;
  1369.         }
  1370.         timeout = &tv;
  1371.     }
  1372.     readMask = rpc_readMask;
  1373.     writeMask = rpc_writeMask;
  1374.     exceptMask = rpc_exceptMask;
  1375.     errno = 0;
  1376.     if (rpcDebug) {
  1377.         printf("\tread(%x), write(%x), except(%x)",
  1378.            readMask.fds_bits[0],
  1379.            writeMask.fds_bits[0],
  1380.            exceptMask.fds_bits[0]);
  1381.         if (timeout) {
  1382.         printf(", to(%d.%06d)\n", timeout->tv_sec, timeout->tv_usec);
  1383.         } else {
  1384.         printf("\n");
  1385.         }
  1386.     }
  1387.     nstreams = select(FD_SETSIZE,&readMask,&writeMask,&exceptMask,timeout);
  1388.     if (nstreams > 0) {
  1389.         /*
  1390.          * Something is ready. Find it and call its handler function.
  1391.          * For each stream that's ready, we find all the things it's ready
  1392.          * for and stick the appropriate RPC_*ABLE constants in 'what',
  1393.          * removing the stream from the various masks as we go. The handler
  1394.          * is called once for each ready stream.
  1395.          *
  1396.          * Once all the streams have been handled, we break out of the loop
  1397.          * and return.
  1398.          */
  1399.         register int base;
  1400.         fd_mask     rmask,
  1401.             wmask,
  1402.             emask;
  1403.         register int stream;
  1404.         register int what;
  1405.         register fd_mask tmask;
  1406.  
  1407.         if (rpcDebug) {
  1408.         printf("result:\n");
  1409.         fflush(stdout);
  1410.         }
  1411.         
  1412.         for (base = 0,
  1413.          rmask = readMask.fds_bits[0],
  1414.          wmask = writeMask.fds_bits[0],
  1415.          emask = exceptMask.fds_bits[0];
  1416.  
  1417.          base < sizeof(rpc_readMask.fds_bits)/sizeof(fd_mask);
  1418.  
  1419.          base++,
  1420.          rmask = readMask.fds_bits[base],
  1421.          wmask = writeMask.fds_bits[base],
  1422.          emask = exceptMask.fds_bits[base])
  1423.         {
  1424.         if (rpcDebug) {
  1425.             printf("\tread(%x), write(%x), except(%x)\n",
  1426.                rmask, wmask, emask);
  1427.             fflush(stdout);
  1428.         }
  1429.         
  1430. #define CHKSTR(n,mask,what) \
  1431.     if (!FD_ISSET((n), &(mask))) { \
  1432.     continue; \
  1433.     }
  1434.         while(rmask) {
  1435.             stream = ffs(rmask) - 1;
  1436.             tmask = 1 << stream;
  1437.             
  1438.             stream += base * (sizeof(fd_mask) * NBBY);
  1439.  
  1440.             rmask &= ~tmask;
  1441.             what = RPC_READABLE;
  1442.  
  1443. CHKSTR(stream, rpc_readMask, "reading");
  1444.             
  1445.             
  1446.             if (rpcDebug) {
  1447.             printf("\t%d: read", stream);
  1448.             }    
  1449.             if (wmask & tmask) {
  1450.             wmask &= ~tmask;
  1451. CHKSTR(stream, rpc_writeMask, "writing");
  1452.             
  1453.             what |= RPC_WRITABLE;
  1454.             if (rpcDebug) {
  1455.                 printf(",write");
  1456.             }
  1457.             }
  1458.             if (emask & tmask) {
  1459.             emask &= ~tmask;
  1460. CHKSTR(stream, rpc_exceptMask, "excepting");
  1461.             
  1462.             what |= RPC_EXCEPTABLE;
  1463.             if(rpcDebug) {
  1464.                 printf(",except");
  1465.             }
  1466.             
  1467.             }
  1468.             if (rpcDebug) {
  1469.             putchar('\n');
  1470.             }
  1471.             (*streams[stream].handler) (stream,
  1472.                         streams[stream].data,
  1473.                         what);
  1474.         }
  1475.         while (wmask != 0) {
  1476.             stream = ffs(wmask) - 1;
  1477.             tmask = 1 << stream;
  1478.             stream += base * (sizeof(fd_mask)*NBBY);
  1479.             wmask &= ~tmask;
  1480.             what = RPC_WRITABLE;
  1481.             
  1482. CHKSTR(stream, rpc_writeMask, "writing");
  1483.             
  1484.             if (rpcDebug) {
  1485.             printf("\t%d: write", stream);
  1486.             }
  1487.             if (emask & tmask) {
  1488.             emask &= ~tmask;
  1489.             what |= RPC_EXCEPTABLE;
  1490. CHKSTR(stream, rpc_exceptMask, "excepting");
  1491.             
  1492.             if (rpcDebug) {
  1493.                 printf(",except");
  1494.             }
  1495.             }
  1496.             if (rpcDebug) {
  1497.             putchar('\n');
  1498.             }
  1499.             (*streams[stream].handler) (stream,
  1500.                         streams[stream].data,
  1501.                         what);
  1502.         }
  1503.         while (emask != 0) {
  1504.             stream = ffs(emask) - 1;
  1505.             tmask = 1 << stream;
  1506.             stream += base * (sizeof(fd_mask)*NBBY);
  1507.             emask &= ~tmask;
  1508. CHKSTR(stream, rpc_exceptMask, "excepting");
  1509.             
  1510.             if(rpcDebug) {
  1511.             printf("\t%d:except\n", stream);
  1512.             }
  1513.             (* streams[stream].handler) (stream,
  1514.                          streams[stream].data,
  1515.                          RPC_EXCEPTABLE);
  1516.         }
  1517.         }
  1518.         return;
  1519.     } else if (nstreams < 0) {
  1520.         /*
  1521.          * Error
  1522.          */
  1523.         if (errno == EBADF) {
  1524.         /*
  1525.          * Some file descriptor was bad -- find it and nuke it
  1526.          */
  1527.         RpcCheckStreams();
  1528.         } else if (errno == EINTR) {
  1529.         /*
  1530.          * Allow signals to make us return.
  1531.          */
  1532.         return;
  1533.         } else if (rpcDebug) {
  1534.         perror("select");
  1535.         }
  1536.     }
  1537.     }
  1538. }
  1539.  
  1540. /*-
  1541.  *-----------------------------------------------------------------------
  1542.  * Rpc_Watch --
  1543.  *    Pay attention to the state of the given stream. Any previous
  1544.  *    handler/state is overridden.
  1545.  *
  1546.  * Results:
  1547.  *    None.
  1548.  *
  1549.  * Side Effects:
  1550.  *    The data for 'streams[sock]' is altered. rpc_readMask,
  1551.  *    rpc_writeMask and rpc_exceptMask may be changed.
  1552.  *
  1553.  *-----------------------------------------------------------------------
  1554.  */
  1555. void
  1556. Rpc_Watch(stream, state, handler, data)
  1557.     int              stream;       /* Stream to observe */
  1558.     int              state;        /* State to watch for */
  1559.     void          (*handler)();    /* Function to call when state acheived */
  1560.     Rpc_Opaque       data;            /* Datum to pass it */
  1561. {
  1562.     if (streams[stream].state != 0) {
  1563.     if (streams[stream].state & RPC_READABLE) {
  1564.         FD_CLR(stream, &rpc_readMask);
  1565.     }
  1566.     if (streams[stream].state & RPC_WRITABLE) {
  1567.         FD_CLR(stream, &rpc_writeMask);
  1568.     }
  1569.     if (streams[stream].state & RPC_EXCEPTABLE) {
  1570.         FD_CLR(stream, &rpc_exceptMask);
  1571.     }
  1572.     }
  1573.     streams[stream].state = state;
  1574.     streams[stream].handler = handler;
  1575.     streams[stream].data = data;
  1576.     if (state & RPC_READABLE) {
  1577.     FD_SET(stream, &rpc_readMask);
  1578.     }
  1579.     if (state & RPC_WRITABLE) {
  1580.     FD_SET(stream, &rpc_writeMask);
  1581.     }
  1582.     if (state & RPC_EXCEPTABLE) {
  1583.     FD_SET(stream, &rpc_exceptMask);
  1584.     }
  1585. }
  1586.  
  1587. /*-
  1588.  *-----------------------------------------------------------------------
  1589.  * Rpc_Ignore --
  1590.  *    Ignore the state of the given stream.
  1591.  *
  1592.  * Results:
  1593.  *    None.
  1594.  *
  1595.  * Side Effects:
  1596.  *    The stream is removed from all the select masks.
  1597.  *
  1598.  *-----------------------------------------------------------------------
  1599.  */
  1600. void
  1601. Rpc_Ignore(stream)
  1602.     int              stream;       /* Stream to ignore */
  1603. {
  1604.     register int  mask;
  1605.  
  1606.     streams[stream].state = 0;
  1607.     FD_CLR(stream, &rpc_readMask);
  1608.     FD_CLR(stream, &rpc_writeMask);
  1609.     FD_CLR(stream, &rpc_exceptMask);
  1610. }
  1611.  
  1612. /*-
  1613.  *-----------------------------------------------------------------------
  1614.  * Rpc_Error --
  1615.  *    Generate an error response for an RPC call.
  1616.  *
  1617.  * Results:
  1618.  *    None.
  1619.  *
  1620.  * Side Effects:
  1621.  *    An error packet is sent if the call wasn't a broadcast.
  1622.  *
  1623.  *-----------------------------------------------------------------------
  1624.  */
  1625. void
  1626. Rpc_Error(rpcMsg, stat)
  1627.     Rpc_Message          rpcMsg;        /* Message to respond to */
  1628.     Rpc_Stat          stat;        /* Status to return */
  1629. {
  1630.     register RpcMessage    *realMsg = (RpcMessage *)rpcMsg;
  1631.     struct {
  1632.     MsgHeader         header;
  1633.     Rpc_Stat          stat;
  1634.     }                  errorMsg;
  1635.  
  1636.     if (rpcDebug) {
  1637.     printf("error on %d: code %d, procedure %d, id %u\n",
  1638.            realMsg->sock, stat, realMsg->header->procNum,
  1639.            realMsg->header->id);
  1640.     }
  1641.     if ((realMsg->header->flags & RPC_BROADCAST) == 0) {
  1642.     errorMsg.header.id =         htonl(realMsg->header->id);
  1643.     errorMsg.header.byteOrder = realMsg->header->byteOrder;
  1644.     errorMsg.header.procNum =   htons(realMsg->header->procNum);
  1645.     errorMsg.header.flags =
  1646.         htons(realMsg->header->flags ^ (RPC_CALL|RPC_ERROR));
  1647.     errorMsg.header.length =    htonl(sizeof(stat));
  1648.     errorMsg.stat =             (Rpc_Stat)htonl(stat);
  1649.  
  1650.     while ((sendto(realMsg->sock, (char *)&errorMsg, sizeof(errorMsg), 0,
  1651.                (struct sockaddr *)realMsg->remote,
  1652.                sizeof(*realMsg->remote)) < 0) &&
  1653.            (errno == EINTR)) {
  1654.            ;
  1655.     }
  1656.     if (realMsg->e != (CacheEntry *)0) {
  1657.         realMsg->e->status = REPLY_SENT;
  1658.         realMsg->e->error = stat;
  1659.     }
  1660.     }
  1661. }
  1662.  
  1663. /*-
  1664.  *-----------------------------------------------------------------------
  1665.  * Rpc_Return --
  1666.  *    Send a reply to an RPC call.
  1667.  *
  1668.  * Results:
  1669.  *    None.
  1670.  *
  1671.  * Side Effects:
  1672.  *    A reply message is sent. Memory will be allocated to cache the
  1673.  *    reply data if this is the first reply for a message (the status
  1674.  *     of the CacheEntry for it is REPLY_PENDING) and the status of the
  1675.  *    cache entry upgraded to REPLY_SENT, with replySize and replyData
  1676.  *    set appropriately.
  1677.  *
  1678.  *-----------------------------------------------------------------------
  1679.  */
  1680. void
  1681. Rpc_Return(rpcMsg, length, data)
  1682.     Rpc_Message          rpcMsg;        /* Message to respond to */
  1683.     int                  length;        /* Length of data to return */
  1684.     Rpc_Opaque           data;        /* Data to return */
  1685. {
  1686.     register RpcMessage    *realMsg = (RpcMessage *)rpcMsg;
  1687.     struct msghdr     msg;
  1688.     struct iovec      iov[2];
  1689.     MsgHeader          header;
  1690.     int                  numBytes;
  1691.  
  1692.     if (rpcDebug) {
  1693.     printf("return on %d: %d bytes for procedure %d, id %u\n",
  1694.            realMsg->sock, length, realMsg->header->procNum,
  1695.            realMsg->header->id);
  1696.     }
  1697.     /*
  1698.      * First set up the header of the reply message. If too many data are
  1699.      * being passed back, an RPC_TOOBIG error is generated instead of the
  1700.      * reply. Note that we indicate the byteOrder for the message is the same
  1701.      * as that sent, since we byte-swap it to be the same.
  1702.      */
  1703.     header.id =           htonl(realMsg->header->id);
  1704.     header.byteOrder =        realMsg->header->byteOrder;
  1705.     header.procNum =         htons(realMsg->header->procNum);
  1706.     header.flags =         htons(realMsg->header->flags^(RPC_CALL|RPC_REPLY));
  1707.     if (length <= MAX_DATA_SIZE) {
  1708.     header.length =     htonl(length);
  1709.     } else {
  1710.     Rpc_Error(rpcMsg, RPC_TOOBIG);
  1711.     return;
  1712.     }
  1713.  
  1714.     /*
  1715.      * Handle caching and swapping: If there were actually reply data,
  1716.      * allocate new storage for them and copy them in, pointing the replyData
  1717.      * field of the CacheEntry at them, else set both the replyData and
  1718.      * replySize fields to 0. Mark the entry as REPLY_SENT.
  1719.      */
  1720.     if ((realMsg->e != (CacheEntry *)0) &&
  1721.     (realMsg->e->status == REPLY_PENDING))
  1722.     {
  1723.     realMsg->e->status = REPLY_SENT;
  1724.     
  1725.     if (length == 0) {
  1726.         if (rpcDebug) {
  1727.         printf("Marking zero-length reply\n");
  1728.         }
  1729.         realMsg->e->replySize = 0;
  1730.         realMsg->e->replyData = (Rpc_Opaque)0;
  1731.     } else {
  1732.         if ((header.byteOrder != RPC_MAGIC) &&
  1733.         (realMsg->server->swapReplyProc != Rpc_SwapNull))
  1734.         {
  1735.         if (rpcDebug) {
  1736.             printf("Swapping reply of %d bytes\n", length);
  1737.         }
  1738.         (* realMsg->server->swapReplyProc)(length,
  1739.                            (Rpc_Opaque)data,
  1740.                            realMsg->server->data);
  1741.         }
  1742.         
  1743.         if (rpcDebug) {
  1744.         printf("Marking reply of %d bytes\n", length);
  1745.         }
  1746.         realMsg->e->replySize = length;
  1747.         realMsg->e->replyData = (Rpc_Opaque)malloc(length);
  1748.         bcopy((char *)data, (char *)realMsg->e->replyData, length);
  1749.     }
  1750.     }
  1751.     
  1752.  
  1753.     /*
  1754.      * Then the I/O vector for the message (to avoid copies, of course)
  1755.      */
  1756.     iov[0].iov_base =         (caddr_t)&header;
  1757.     iov[0].iov_len =        sizeof(header);
  1758.     iov[1].iov_base =        (caddr_t)data;
  1759.     iov[1].iov_len =        length;
  1760.  
  1761.     /*
  1762.      * Finally the msghdr for the sendmsg call.
  1763.      */
  1764.     msg.msg_name =        (caddr_t)realMsg->remote;
  1765.     msg.msg_namelen =        sizeof(*realMsg->remote);
  1766.     msg.msg_iov =         iov;
  1767.     msg.msg_iovlen =        (length != 0) ? 2 : 1;
  1768.     msg.msg_accrights =        (caddr_t)0;
  1769.     msg.msg_accrightslen =  0;
  1770.  
  1771.     /*
  1772.      * Keep sending the message while the thing keeps being interrupted.
  1773.      */
  1774.     do {
  1775.     numBytes = sendmsg(realMsg->sock, &msg, 0);
  1776.     } while ((numBytes < 0) && (errno == EINTR));
  1777.  
  1778.     if (numBytes < 0) {
  1779.     if (rpcDebug) {
  1780.         perror("Rpc_Return: sendmsg");
  1781.     }
  1782.     }
  1783. }
  1784.  
  1785. /*-
  1786.  *-----------------------------------------------------------------------
  1787.  * Rpc_ServerCreate --
  1788.  *    Set a server for a <socket, procedure-number> pair. The server
  1789.  *    should be declared as follows:
  1790.  *        serverProc(fromPtr, msg, dataLen, data, serverData)
  1791.  *              struct sockaddr_in *fromPtr;
  1792.  *              Rpc_Opaque msg;
  1793.  *              int dataLen;
  1794.  *              Rpc_Opaque data;
  1795.  *              Rpc_Opaque serverData;
  1796.  *
  1797.  *    fromPtr points to the address of the sender of the request.
  1798.  *    msg is an opaque parameter that must be used to send a reply.
  1799.  *    dataLen is the number of bytes of data that came with the request.
  1800.  *    data is the data that were sent with the request.
  1801.  *    serverData is the piece of data supplied when the server was created.
  1802.  *
  1803.  *    data and serverData should not, of course, be opaque to the server...
  1804.  *
  1805.  *    The swap procedures should be declared as:
  1806.  *        swapProc(length, data, serverData)
  1807.  *        int        length;
  1808.  *        Rpc_Opaque    data;
  1809.  *        Rpc_Opaque      serverData;
  1810.  *
  1811.  *    data is the data to be swapped and length is its length. serverData
  1812.  *    is the same as for the serverProc call.
  1813.  *
  1814.  * Results:
  1815.  *    None.
  1816.  *
  1817.  * Side Effects:
  1818.  *    Any previous server is overridden.
  1819.  *
  1820.  *-----------------------------------------------------------------------
  1821.  */
  1822. void
  1823. Rpc_ServerCreate(sock, procNum, serverProc, swapArgsProc, swapReplyProc,
  1824.          serverData)
  1825.     int              sock;                /* Socket for server */
  1826.     Rpc_Proc       procNum;          /* Procedure number to serve */
  1827.     void          (*serverProc)();  /* Handler function to call */
  1828.     void          (*swapArgsProc)();/* Swapping function for args */
  1829.     void      (*swapReplyProc)();/* Swapping function for reply */
  1830.     Rpc_Opaque       serverData;        /* Datum to pass to server function */
  1831. {
  1832.     register RpcServer    *s;
  1833.  
  1834.     /*
  1835.      * Look for previous server record and use it if present
  1836.      */
  1837.     for (s = rpcServers[sock]; s != (RpcServer *)0; s = s->next) {
  1838.     if (s->procNum == procNum) {
  1839.         break;
  1840.     }
  1841.     }
  1842.     if (s == (RpcServer *)0) {
  1843.     /*
  1844.      * Didn't exist: create new record and link it into the list of
  1845.      * servers on the socket.
  1846.      */
  1847.     s = (RpcServer *)malloc(sizeof(RpcServer));
  1848.     s->next = rpcServers[sock];
  1849.     rpcServers[sock] = s;
  1850.     } else {
  1851.     /*
  1852.      * It did exist. Since we're zeroing out the cache, we want to
  1853.      * destroy previously-cached calls.
  1854.      */
  1855.     RpcCacheDestroy(s);
  1856.     }
  1857.  
  1858.     /*
  1859.      * Install new server in server record
  1860.      */
  1861.     s->procNum = procNum;
  1862.     s->serverProc = serverProc;
  1863.     s->swapArgsProc = swapArgsProc;
  1864.     s->swapReplyProc = swapReplyProc;
  1865.     s->data = serverData;
  1866.     bzero((char *)s->cache, sizeof(s->cache));
  1867.  
  1868.     /*
  1869.      * Install handler for stream.
  1870.      */
  1871.     Rpc_Watch(sock, RPC_READABLE, RpcHandleStream, (Rpc_Opaque)0);
  1872. }
  1873.  
  1874. /*-
  1875.  *-----------------------------------------------------------------------
  1876.  * Rpc_ServerDelete --
  1877.  *    Deletes the handler for the given procedure on the given socket.
  1878.  *
  1879.  * Results:
  1880.  *    None.
  1881.  *
  1882.  * Side Effects:
  1883.  *    If the handler was defined, it is removed from the list and freed.
  1884.  *    If no calls are pending on the socket and this is the last server
  1885.  *    for the socket, the socket is ignored in later selects.
  1886.  *
  1887.  *-----------------------------------------------------------------------
  1888.  */
  1889. void
  1890. Rpc_ServerDelete(sock, procNum)
  1891.     int        sock;     /* Socket handler is on */
  1892.     Rpc_Proc procNum;    /* Procedure number to stop handling */
  1893. {
  1894.     register RpcServer     *s;
  1895.     register RpcServer    **prev;
  1896.  
  1897.     prev = &rpcServers[sock];
  1898.     for (s = rpcServers[sock]; s != (RpcServer *)0; s = s->next) {
  1899.     if (s->procNum == procNum) {
  1900.         break;
  1901.     } else {
  1902.         prev = &s->next;
  1903.     }
  1904.     }
  1905.     if (s != (RpcServer *)0) {
  1906.     *prev = s->next;
  1907.     RpcCacheDestroy(s);
  1908.     free((char *)s);
  1909.     if ((rpcServers[sock] == (RpcServer *)0) &&
  1910.         (rpcCalls[sock] == (RpcCall *)0)) {
  1911.         Rpc_Ignore(sock);
  1912.     }
  1913.     }
  1914. }
  1915.  
  1916. /*-
  1917.  *-----------------------------------------------------------------------
  1918.  * Rpc_Call --
  1919.  *    Invoke a remote procedure on another machine.
  1920.  *
  1921.  * Results:
  1922.  *    RPC_SUCCESS if the call went ok.
  1923.  *    RPC_TIMEDOUT if the server couldn't be reached in the given time.
  1924.  *    RPC_CANTSEND if couldn't send the message, for some reason.
  1925.  *
  1926.  * Side Effects:
  1927.  *    Messages are sent...
  1928.  *
  1929.  *-----------------------------------------------------------------------
  1930.  */
  1931. Rpc_Stat
  1932. Rpc_Call(sock, server, procNum, inLength, inData, outLength, outData,
  1933.      numRetries, retry)
  1934.     int                  sock;        /* Socket on which to call */
  1935.     struct sockaddr_in    *server;    /* Complete address of server */
  1936.     Rpc_Proc           procNum;    /* Procedure number to call */
  1937.     int                  inLength;   /* Length of data for call */
  1938.     Rpc_Opaque           inData;        /* Data for call */
  1939.     int                  outLength;  /* Expected length of results. If 0,
  1940.                      * call will be acknowledged before it
  1941.                      * is handled on remote side. */
  1942.     Rpc_Opaque           outData;    /* Place to store results of call */
  1943.     int                  numRetries; /* Number of times to try the call before
  1944.                      * timing out */
  1945.     struct timeval    *retry;        /* Interval at which to retry */
  1946. {
  1947.     struct iovec      iov[2];
  1948.     MsgHeader           header;
  1949.     RpcCall          call;
  1950.  
  1951.     if (inLength > MAX_DATA_SIZE) {
  1952.     return (RPC_TOOBIG);
  1953.     }
  1954.     call.message.msg_name =          (caddr_t)server;
  1955.     call.message.msg_namelen =        sizeof(*server);
  1956.     call.message.msg_iov =           iov;
  1957.     call.message.msg_iovlen =        (inLength != 0) ? 2 : 1;
  1958.     call.message.msg_accrights =     (caddr_t)0;
  1959.     call.message.msg_accrightslen =  0;
  1960.  
  1961.     call.sock =                 sock;
  1962.     call.numRetries =                numRetries;
  1963.     call.id =                      RpcUniqueID();
  1964.     call.replyLen =                outLength;
  1965.     call.reply =                  outData;
  1966.     call.resend =                 Rpc_EventCreate(retry,
  1967.                             RpcResend,
  1968.                             (Rpc_Opaque)&call);
  1969.     call.replied =                False;
  1970.     call.next =                   rpcCalls[sock];
  1971.  
  1972.     rpcCalls[sock] = &call;
  1973.  
  1974.     header.id =                      htonl(call.id);
  1975.     header.byteOrder =            RPC_MAGIC;
  1976.     header.procNum =                htons(procNum);
  1977.     header.flags =                htons(RPC_CALL);
  1978.     header.length =                htonl(inLength);
  1979.  
  1980.     iov[0].iov_base =                 (caddr_t)&header;
  1981.     iov[0].iov_len =                 sizeof(header);
  1982.     iov[1].iov_base =                 (caddr_t)inData;
  1983.     iov[1].iov_len =                 inLength;
  1984.  
  1985.     /*
  1986.      * Set to catch responses and send initial packet.
  1987.      */
  1988.     Rpc_Watch(sock, RPC_READABLE, RpcHandleStream, (Rpc_Opaque)0);
  1989.     (void)RpcResend(&call);
  1990.  
  1991.     while (!call.replied) {
  1992.     Rpc_Wait();
  1993.     }
  1994.  
  1995.     /*
  1996.      * Cleanup: nuke the resend event and ignore the socket if we aren't
  1997.      * paying attention to it anymore (no servers for it and no calls pending
  1998.      * on it)
  1999.      */
  2000.     Rpc_EventDelete(call.resend);
  2001.  
  2002.     if ((rpcServers[sock] == (RpcServer *)0) &&
  2003.     (rpcCalls[sock] == (RpcCall *)0))
  2004.     {
  2005.     Rpc_Ignore(sock);
  2006.     }
  2007.     return(call.status);
  2008. }
  2009.  
  2010. /*-
  2011.  *-----------------------------------------------------------------------
  2012.  * RpcGetNetworks --
  2013.  *    Return the network address(es) of this machine (as suitable for
  2014.  *    broadcasting).
  2015.  *
  2016.  * Results:
  2017.  *    The broadcast address(es) for the machine and the number of
  2018.  *    networks the machine is on.
  2019.  *
  2020.  * Side Effects:
  2021.  *    None.
  2022.  *
  2023.  *-----------------------------------------------------------------------
  2024.  */
  2025. static void
  2026. RpcGetNetworks(sock, maxNets, networks, numNetsPtr)
  2027.     int                  sock;        /* Socket with which to find the networks*/
  2028.     int            maxNets;    /* Maximum number of networks supported */
  2029.     struct sockaddr_in    *networks;  /* Where to store the addresses. These are
  2030.                      * full sockaddr_in's to make life easier
  2031.                      * for the caller */
  2032.     int                    *numNetsPtr;/* Place to store the actual number of
  2033.                      * networks */
  2034. {
  2035.     struct ifconf     ifc;        /* Record of all known network
  2036.                      * interfaces */
  2037.     struct ifreq      ifreq,        /* Current network interface */
  2038.             *ifr;        /* Pointer into ifc of current interface */
  2039.     int                  n;          /* Number of networks left to check */
  2040.     char              buf[1024];  /* Buffer for fetching interface info */
  2041.     struct in_addr     addr;        /* Actual broadcast address */
  2042.     int                    i;          /* Current broadcast network (entry in
  2043.                      * "networks") */
  2044.  
  2045.     ifc.ifc_len = sizeof(buf);
  2046.     ifc.ifc_buf = buf;
  2047.     i = 0;
  2048.  
  2049.     if (ioctl(sock, SIOCGIFCONF, (char *)&ifc) >= 0) {
  2050.     /*
  2051.      * First fetch info for all the networks known
  2052.      */
  2053.     ifr = ifc.ifc_req;
  2054.  
  2055.     /*
  2056.      * Step through each network known, looking for those that are up and
  2057.      * have broadcasting enabled
  2058.      */
  2059.     for (n = ifc.ifc_len/sizeof (struct ifreq);
  2060.          (i < maxNets) && (n > 0);
  2061.          n--, ifr++)
  2062.     {
  2063.         /*
  2064.          * Copy so we can mangle the address to get the interface flags
  2065.          */
  2066.         ifreq = *ifr;
  2067.  
  2068.         /*
  2069.          * Find the state of the interface
  2070.          */
  2071.         if (ioctl(sock, SIOCGIFFLAGS, (char *)&ifreq) < 0) {
  2072.         if (rpcDebug) {
  2073.             perror("RpcGetNetworks: ioctl (get interface flags)");
  2074.         }
  2075.         continue;
  2076.         }
  2077.  
  2078.         if ((ifreq.ifr_flags & IFF_BROADCAST) &&
  2079.         (ifreq.ifr_flags & IFF_UP) &&
  2080.         (ifr->ifr_addr.sa_family == AF_INET))
  2081.         {
  2082.         /*
  2083.          * Good stuff, Maynard. We can broadcast on it and it's up.
  2084.          * Now figure out the actual address to use for broadcasting.
  2085.          * The way this works is:
  2086.          *  - If the system supports broadcast addresses and this
  2087.          *    interface has one, we use that.
  2088.          *  - If the system supports netmasks and the interface has
  2089.          *    one, we take the interface's address and mask out
  2090.          *    the bits indicated by the mask, assuming that to be
  2091.          *    the broadcast address.
  2092.          *  - As a last resort, we use inet_netof to find the network
  2093.          *    part of the interface's address and assume we can
  2094.          *    broadcast there.
  2095.          */
  2096.         struct sockaddr_in *sin;    /* Pointer into ifreq for ease
  2097.                          * of reference */
  2098.             
  2099.         addr = ((struct sockaddr_in *)&ifr->ifr_addr)->sin_addr;
  2100.         sin = (struct sockaddr_in *)&ifreq.ifr_addr;
  2101. #ifdef SIOCGIFBRDADDR
  2102.         if (ioctl(sock, SIOCGIFBRDADDR, &ifreq) == 0) {
  2103.             /*
  2104.              * If the interface has a broadcast address associated
  2105.              * with it, use that.
  2106.              */
  2107.             addr = sin->sin_addr;
  2108.         } else
  2109. #endif SIOCGIFBRDADDR
  2110. #ifdef SIOCGIFNETMASK
  2111.             if (ioctl(sock, SIOCGIFNETMASK, &ifreq) == 0) {
  2112.             /*
  2113.              * If the interface has a netmask defined, use
  2114.              * that mask to determine the network address for
  2115.              * broadcasting. Both addr and ifreq.ifr_addr are
  2116.              * in network-order, so no need to convert...
  2117.              */
  2118.             addr.s_addr &= sin->sin_addr.s_addr;
  2119.             } else {
  2120.             addr = inet_makeaddr(inet_netof(addr), INADDR_ANY);
  2121.             }
  2122. #else
  2123.         addr = inet_makeaddr(inet_netof(addr), INADDR_ANY);
  2124. #endif SIOCGIFNETMASK
  2125.  
  2126.         networks[i].sin_addr = addr;
  2127.         i++;
  2128.         }
  2129.     }
  2130.     }
  2131.  
  2132.     /*
  2133.      * Return the number of networks actually found.
  2134.      */
  2135.     *numNetsPtr = i;
  2136. }
  2137.  
  2138. /*-
  2139.  *-----------------------------------------------------------------------
  2140.  * Rpc_Broadcast --
  2141.  *    Broadcast an rpc call. If the server's address (server->sin_addr)
  2142.  *    is not INADDR_ANY, then only that network is used. (if the address
  2143.  *    isn't a network, this degenerates to an Rpc_Call with a callback
  2144.  *    function).
  2145.  *    handleProc should be declared:
  2146.  *        Bool
  2147.  *        handleProc(fromPtr, dataLen, data)
  2148.  *              struct sockaddr_in *fromPtr;
  2149.  *              int dataLen;
  2150.  *              Rpc_Opaque data;
  2151.  *    It should return True if broadcasting should stop and False if it
  2152.  *    should continue.
  2153.  *
  2154.  * Results:
  2155.  *    RPC_SUCCESS if at least one reply was received.
  2156.  *    RPC_TIMEDOUT if no reply was received.
  2157.  *
  2158.  * Side Effects:
  2159.  *    A call is broadcast over the network.
  2160.  *    outData is overwritten.
  2161.  *
  2162.  * Notes:
  2163.  *    Only a single network is supported for now. If the machine is
  2164.  *    on multiple networks, you will have to make separate Rpc_Broadcast
  2165.  *    calls specifying each network in turn.
  2166.  *
  2167.  *    Multiple networks can be handled by queueing multiple call
  2168.  *    messages -- one per network. The problem would be tracking them
  2169.  *    all.
  2170.  *
  2171.  *-----------------------------------------------------------------------
  2172.  */
  2173. Rpc_Stat
  2174. Rpc_Broadcast(sock, server, procNum, inLength, inData, outLength, outData,
  2175.           numRetries, retry, handleProc, handleData)
  2176.     int                  sock;                /* Socket on which to call */
  2177.     struct sockaddr_in    *server;            /* Complete address of server.
  2178.                          * If the sin_addr field is
  2179.                          * INADDR_ANY, broadcast to all
  2180.                          * attached networks. */
  2181.     Rpc_Proc           procNum;            /* Procedure number to call */
  2182.     int                  inLength;           /* Length of data for call */
  2183.     Rpc_Opaque           inData;                /* Data for call */
  2184.     int                  outLength;          /* Expected length of results. If
  2185.                          * 0, call will be acknowledged
  2186.                          * before it is handled on remote
  2187.                          * side. */
  2188.     Rpc_Opaque           outData;            /* Place to store results of
  2189.                          * call */
  2190.     int                  numRetries;         /* Number of times to try the call
  2191.                          * before timing out */
  2192.     struct timeval    *retry;                /* Interval at which to retry */
  2193.     Boolean              (*handleProc)();    /* Function to handle responses */
  2194.     Rpc_Opaque            handleData;         /* Extra data to pass to
  2195.                          * handleProc */
  2196. {
  2197.     struct sockaddr_in     networks[MAXNETS];  /* Addresses to which to send */
  2198.     RpcCall          calls[MAXNETS];        /* Call records for above */
  2199.     MsgHeader           header[MAXNETS];    /* Headers for above */
  2200.     struct iovec      iov[MAXNETS][2];    /* sendmsg vectors for above */
  2201.     int                    numNets;            /* Number of entries in above */
  2202.     int                  numResponses = 0;   /* Number of responses received
  2203.                          * so far */
  2204.     int            one = 1;            /* For setsockopt */
  2205.     int                    i;                  /* Index into calls et al */
  2206.     Rpc_Stat            result=RPC_TIMEDOUT;/* Our return value */
  2207.     RpcServer            cache;                /* Fake server record for caching
  2208.                          * responses */
  2209.     if (inLength > MAX_DATA_SIZE) {
  2210.     return (RPC_TOOBIG);
  2211.     }
  2212.     
  2213.     /*
  2214.      * If the server isn't indicated (its address is INADDR_ANY), we fetch
  2215.      * the addresses of all the networks on which this host resides (up to
  2216.      * MAXNETS).
  2217.      */
  2218.     if (server->sin_addr.s_addr == INADDR_ANY) {
  2219.     RpcGetNetworks(sock, MAXNETS, networks, &numNets);
  2220.     } else {
  2221.     /*
  2222.      * Server given -- only broadcast to one place.
  2223.      */
  2224.     networks[0] = *server;
  2225.     numNets = 1;
  2226.     }
  2227.     
  2228. #ifdef SO_BROADCAST
  2229.     /*
  2230.      * Enable broadcasting on this socket. If we can't do that, we can't
  2231.      * send...
  2232.      */
  2233.     if (setsockopt(sock, SOL_SOCKET, SO_BROADCAST, &one, sizeof(one)) < 0){
  2234.     if (rpcDebug) {
  2235.         perror("setsockopt");
  2236.         printf("sock = %d\n", sock);
  2237.     }
  2238.     return(RPC_CANTSEND);
  2239.     }
  2240. #endif SO_BROADCAST
  2241.  
  2242.     /*
  2243.      * For each network this host is on, make up a unique RpcCall structure,
  2244.      * with proper address, id, header, etc. Link it in and issue the initial
  2245.      * call in this loop.
  2246.      */
  2247.     for (i = 0; i < numNets; i++) {
  2248.     /*
  2249.      * Copy family and destination port from the server record we were
  2250.      * given. The sin_addr field is filled in by RpcGetNetworks
  2251.      */
  2252.     networks[i].sin_family =            server->sin_family;
  2253.     networks[i].sin_port =                server->sin_port;
  2254.     
  2255.     /*
  2256.      * Set up message header first.
  2257.      */
  2258.     calls[i].message.msg_name =          (caddr_t)&networks[i];
  2259.     calls[i].message.msg_namelen =        sizeof(networks[i]);
  2260.     calls[i].message.msg_iov =           iov[i];
  2261.     calls[i].message.msg_iovlen =        (inLength != 0) ? 2 : 1;
  2262.     calls[i].message.msg_accrights =    (caddr_t)0;
  2263.     calls[i].message.msg_accrightslen = 0;
  2264.     
  2265.     /*
  2266.      * Initialize the rest of our parameters. Note the message must have
  2267.      * a unique ID so it can be found when someone on the network responds.
  2268.      */
  2269.     calls[i].sock =                 sock;
  2270.     calls[i].numRetries =                numRetries;
  2271.     calls[i].id =                      RpcUniqueID();
  2272.     calls[i].replyLen =                outLength;
  2273.     calls[i].reply =                  outData;
  2274.     calls[i].resend =                 Rpc_EventCreate(retry,
  2275.                                 RpcResend,
  2276.                                 (Rpc_Opaque)&calls[i]);
  2277.     calls[i].replied =                 False;
  2278.  
  2279.     /*
  2280.      * Link it into the chain of calls for this socket
  2281.      */
  2282.     calls[i].next =                rpcCalls[sock];
  2283.     rpcCalls[sock] =            &calls[i];
  2284.     
  2285.     /*
  2286.      * Set up the header to go with the call.
  2287.      */
  2288.     header[i].id =                      htonl(calls[i].id);
  2289.     header[i].byteOrder =            RPC_MAGIC;
  2290.     header[i].procNum =                htons(procNum);
  2291.     header[i].flags =                htons(RPC_CALL|RPC_BROADCAST);
  2292.     header[i].length =                htonl(inLength);
  2293.     
  2294.     /*
  2295.      * Set up the I/O vector for the message (to avoid copying)
  2296.      */
  2297.     iov[i][0].iov_base =                 (caddr_t)&header[i];
  2298.     iov[i][0].iov_len =                 sizeof(header[i]);
  2299.     iov[i][1].iov_base =                 (caddr_t)inData;
  2300.     iov[i][1].iov_len =                 inLength;
  2301.  
  2302.     /*
  2303.      * Dispatch the initial broadcast call
  2304.      */
  2305.     (void)RpcResend(&calls[i]);
  2306.     }
  2307.  
  2308.     /*
  2309.      * Initialize the "server" cache
  2310.      */
  2311.     bzero(cache.cache, sizeof(cache.cache));
  2312.     
  2313.     /*
  2314.      * Watch for replies on this socket
  2315.      */
  2316.     Rpc_Watch(sock, RPC_READABLE, RpcHandleStream, (Rpc_Opaque)0);
  2317.  
  2318.     /*
  2319.      * Loop for the entire broadcast period, reinstalling the call each
  2320.      * time it receives a response until either all the calls report an
  2321.      * error or the handler function returns True. If any of the errors
  2322.      * isn't RPC_TIMEDOUT, record it as the result to return.
  2323.      */
  2324.     while(1) {
  2325.     int    failures;   /* The number of failed calls */
  2326.         
  2327.     /*
  2328.      * Wait for something to happen
  2329.      */
  2330.     Rpc_Wait();
  2331.     
  2332.     /*
  2333.      * Check all our calls for success or failure.
  2334.      */
  2335.     for (failures = i = 0; i < numNets; i++) {
  2336.         if (calls[i].replied) {
  2337.         if (calls[i].status == RPC_SUCCESS) {
  2338.             CacheEntry    *e;
  2339.             Boolean     new;
  2340.  
  2341.             /*
  2342.              * Check the response cache to see if this party's
  2343.              * been heard from before.
  2344.              */
  2345.             e = RpcCacheFind(&cache, &calls[i].remote, (RpcID)0,
  2346.                      True, &new);
  2347.  
  2348.             if (new) {
  2349.             /*
  2350.              * Note another successful response
  2351.              */
  2352.             numResponses += 1;
  2353.  
  2354.             /*
  2355.              * No point in flushing the cache entry (don't
  2356.              * want it flushed, in fact), so delete the
  2357.              * flush event.
  2358.              */
  2359.             Rpc_EventDelete(e->flushEvent);
  2360.             e->flushEvent = (Rpc_Event)NULL;
  2361.             
  2362.  
  2363.             if ((*handleProc)(&calls[i].remote,
  2364.                       calls[i].replyLen,
  2365.                       outData,
  2366.                       handleData))
  2367.             {
  2368.                 /*
  2369.                  * Handler returned True -- abort the whole
  2370.                  * process.
  2371.                  */
  2372.                 goto done_broadcast;
  2373.             }
  2374.             }
  2375.             /*
  2376.              * Requeue and reinitialize the call. The event is
  2377.              * still registered, so the call will continue to be
  2378.              * sent.
  2379.              */
  2380.             calls[i].replied     = False;
  2381.             calls[i].replyLen     = outLength;
  2382.             calls[i].next       = rpcCalls[sock];
  2383.  
  2384.             rpcCalls[sock] = &calls[i];
  2385.         } else {
  2386.             if (calls[i].status != RPC_TIMEDOUT) {
  2387.             /*
  2388.              * Didn't timeout -- record that as our return code.
  2389.              * XXX: This shouldn't abort the broadcast on
  2390.              * that interface, though...
  2391.              */
  2392.             result = calls[i].status;
  2393.             }
  2394.             /*
  2395.              * Note another failed call.
  2396.              */
  2397.             failures++;
  2398.         }
  2399.         }
  2400.     }
  2401.     if (failures == numNets) {
  2402.         /*
  2403.          * All done -- get out of here
  2404.          */
  2405.         break;
  2406.     }
  2407.     }
  2408.  
  2409. done_broadcast:
  2410.     /*
  2411.      * Delete the resend event for each call.
  2412.      */
  2413.     for (i = 0; i < numNets; i++) {
  2414.     Rpc_EventDelete(calls[i].resend);
  2415.     }
  2416.  
  2417.     /*
  2418.      * Nuke the cache.
  2419.      */
  2420.     RpcCacheDestroy(&cache);
  2421.     
  2422.     /*
  2423.      * If nothing left on the socket, ignore it
  2424.      */
  2425.     if ((rpcServers[sock] == (RpcServer *)0) &&
  2426.     (rpcCalls[sock] == (RpcCall *)0))
  2427.     {
  2428.     Rpc_Ignore(sock);
  2429.     }
  2430.     
  2431.     if (result == RPC_TIMEDOUT) {
  2432.     /*
  2433.      * If timed out, return success as long as we got something from
  2434.      * someone.
  2435.      */
  2436.     return(numResponses ? RPC_SUCCESS : RPC_TIMEDOUT);
  2437.     } else {
  2438.     /*
  2439.      * Worse error -- return it.
  2440.      */
  2441.     return(result);
  2442.     }
  2443. }
  2444.  
  2445. /*-
  2446.  *-----------------------------------------------------------------------
  2447.  * Rpc_Run --
  2448.  *    Function to run the Rpc system. This never returns. The program
  2449.  *    is expected to revolve around the system, being completely
  2450.  *    event-driven.
  2451.  *
  2452.  * Results:
  2453.  *    None.
  2454.  *
  2455.  * Side Effects:
  2456.  *    Not really.
  2457.  *
  2458.  *-----------------------------------------------------------------------
  2459.  */
  2460. void
  2461. Rpc_Run()
  2462. {
  2463.     while(1) {
  2464.     Rpc_Wait();
  2465.     }
  2466. }
  2467.  
  2468. static jmp_buf      acceptBuf;
  2469.  
  2470. /*-
  2471.  *-----------------------------------------------------------------------
  2472.  * RpcTcpTimeout --
  2473.  *    An accept on a stream has timed out -- don't keep us waiting.
  2474.  *
  2475.  * Results:
  2476.  *    None.
  2477.  *
  2478.  * Side Effects:
  2479.  *    Performs a longjmp through acceptBuf.
  2480.  *    
  2481.  *-----------------------------------------------------------------------
  2482.  */
  2483. static void
  2484. RpcTcpTimeout()
  2485. {
  2486.     longjmp(acceptBuf, 1);
  2487. }
  2488.  
  2489. /*-
  2490.  *-----------------------------------------------------------------------
  2491.  * RpcTcpAccept --
  2492.  *    Accept on a passive TCP stream, duplicating all servers on the
  2493.  *    passive stream to the active one.
  2494.  *
  2495.  * Results:
  2496.  *    None.
  2497.  *
  2498.  * Side Effects:
  2499.  *    A new stream is opened and rpcServer structures allocated to hold
  2500.  *    those of the passive stream.
  2501.  *
  2502.  *-----------------------------------------------------------------------
  2503.  */
  2504. static void
  2505. RpcTcpAccept(stream)
  2506.     int                  stream;        /* Passive stream with connection pending */
  2507. {
  2508.     register int      newStream;  /* New active stream */
  2509.     register RpcServer    *serv;        /* Original server record */
  2510.     register RpcServer    *newServ;   /* Duplicate server record */
  2511.     register RpcServer    **prevServ; /* Previous duplicate's next field */
  2512.     struct sockaddr_in    remote;
  2513.     int                  len;
  2514.  
  2515.     signal(SIGALRM, RpcTcpTimeout);
  2516.  
  2517.     alarm(5);
  2518.     if (setjmp(acceptBuf) == 0) {
  2519.     len = sizeof(remote);
  2520.     newStream = accept(stream, (struct sockaddr *)&remote, &len);
  2521.     } else {
  2522.     newStream = -1;
  2523.     }
  2524.     alarm(0);
  2525.     signal(SIGALRM, SIG_DFL);
  2526.  
  2527.     if (newStream < 0) {
  2528.     if (rpcDebug) {
  2529.         perror("accept");
  2530.     }
  2531.     return;
  2532.     }
  2533.     /*
  2534.      * First set to handle the new stream when it becomes readable, then
  2535.      * duplicate the list of RpcServers attached to the passive stream. This
  2536.      * is faster than calling Rpc_ServerCreate because it doesn't have to check
  2537.      * down the list for each server -- it just duplicates it.
  2538.      */
  2539.     Rpc_Watch(newStream, RPC_READABLE, RpcHandleStream, (Rpc_Opaque)0);
  2540.     for (serv = rpcServers[stream], prevServ = &rpcServers[newStream];
  2541.      serv != (RpcServer *)0;
  2542.      serv = serv->next) {
  2543.          newServ = (RpcServer *)malloc(sizeof(RpcServer));
  2544.          newServ->procNum = serv->procNum;
  2545.          newServ->serverProc = serv->serverProc;
  2546.          newServ->swapArgsProc = serv->swapArgsProc;
  2547.          newServ->swapReplyProc = serv->swapReplyProc;
  2548.          newServ->data = serv->data;
  2549.          bzero((char *)newServ->cache, sizeof(newServ->cache));
  2550.          *prevServ = newServ;
  2551.          prevServ = &newServ->next;
  2552.     }
  2553.     *prevServ = (RpcServer *)0;
  2554. }
  2555.  
  2556. /*-
  2557.  *-----------------------------------------------------------------------
  2558.  * Rpc_TcpCreate --
  2559.  *    Create a tcp socket for rpc. The socket will be a passive service
  2560.  *    socket ready for connections if 'service' is True. If 'service'
  2561.  *    is False, the socket remains unbound and unconnected. It will be
  2562.  *    bound and connected when the first Rpc_Call is made on it.
  2563.  *
  2564.  * Results:
  2565.  *    The file descriptor of the open socket.
  2566.  *
  2567.  * Side Effects:
  2568.  *    None.
  2569.  *
  2570.  *-----------------------------------------------------------------------
  2571.  */
  2572. int
  2573. Rpc_TcpCreate(service, port)
  2574.     Boolean           service;    /* True if socket will be used for handling
  2575.                      * rpc calls */
  2576.     unsigned short    port;        /* Port number to use if socket is a
  2577.                      * service socket. */
  2578. {
  2579.     register int      s;
  2580.  
  2581.     s = socket(AF_INET, SOCK_STREAM, 0);
  2582.     if (s < 0) {
  2583.     return (-1);
  2584.     }
  2585.     if (service) {
  2586.     struct sockaddr_in  sin;
  2587.  
  2588.     /*
  2589.      * The address has a sin_zero field that must be zero, for some reason
  2590.      * known only to the demented engineers who wrote the code.
  2591.      */
  2592.     bzero(&sin, sizeof(sin));
  2593.  
  2594.     sin.sin_family = AF_INET;
  2595.     sin.sin_port = htons(port);
  2596.     sin.sin_addr.s_addr = htonl(INADDR_ANY);
  2597.     if (bind(s, (struct sockaddr *)&sin, sizeof(sin)) < 0) {
  2598.         if (rpcDebug) {
  2599.         perror("bind");
  2600.         }
  2601.         (void)close(s);
  2602.         return (-1);
  2603.     }
  2604.     if (listen(s, 5) < 0) {
  2605.         if (rpcDebug) {
  2606.         perror("listen");
  2607.         }
  2608.         (void)close(s);
  2609.         return(-1);
  2610.     }
  2611.     }
  2612.     return(s);
  2613. }
  2614.  
  2615. /*-
  2616.  *-----------------------------------------------------------------------
  2617.  * Rpc_UdpCreate --
  2618.  *    Create a UDP socket that may be used for service or calling.
  2619.  *
  2620.  * Results:
  2621.  *    The socket created.
  2622.  *
  2623.  * Side Effects:
  2624.  *    A socket is created and its address bound. If service is True,
  2625.  *    the given port is used for its address, otherwise it is assigned
  2626.  *    one by the system. Note each UDP socket used for calling must
  2627.  *    have a port bound to it or the call will never receive a reply,
  2628.  *    as the port will be assigned when the sendto() call is issued and
  2629.  *    forgotten immediately thereafter.
  2630.  *
  2631.  *-----------------------------------------------------------------------
  2632.  */
  2633. int
  2634. Rpc_UdpCreate(service, port)
  2635.     Boolean           service;        /* True if socket will be used to
  2636.                      * service rpc calls */
  2637.     unsigned short    port;            /* Port number to use for socket, if
  2638.                      * service is True */
  2639. {
  2640.     register int      s;
  2641.     struct sockaddr_in    sin;
  2642.  
  2643.     s = socket(AF_INET, SOCK_DGRAM, 0);
  2644.     if (s < 0) {
  2645.     return(-1);
  2646.     }
  2647.  
  2648.     /*
  2649.      * The ISI has a sin_zero field that must be zero, for some reason
  2650.      * known only to the demented engineers who programmed the thing.
  2651.      */
  2652.     bzero(&sin, sizeof(sin));
  2653.  
  2654.     sin.sin_family = AF_INET;
  2655.     sin.sin_port = htons(service ? port : 0);
  2656.     sin.sin_addr.s_addr = htonl(INADDR_ANY);
  2657.     if (bind(s, (struct sockaddr *)&sin, sizeof(sin)) < 0) {
  2658.     (void)close(s);
  2659.     return(-1);
  2660.     }
  2661.     return (s);
  2662. }
  2663.  
  2664. /*-
  2665.  *-----------------------------------------------------------------------
  2666.  * Rpc_Debug --
  2667.  *    Set debugging state to 'debug'. If True, debug messages are
  2668.  *    printed to track the operation of rpc system.
  2669.  *
  2670.  * Results:
  2671.  *    None.
  2672.  *
  2673.  * Side Effects:
  2674.  *    rpcDebug's value is altered.
  2675.  *
  2676.  *-----------------------------------------------------------------------
  2677.  */
  2678. void
  2679. Rpc_Debug(debug)
  2680.     Boolean           debug;
  2681. {
  2682.     rpcDebug = debug;
  2683. }
  2684.  
  2685. /*-
  2686.  *-----------------------------------------------------------------------
  2687.  * Rpc_ErrorMessage --
  2688.  *    Return a string that describes the given error code.
  2689.  *
  2690.  * Results:
  2691.  *    See above.
  2692.  *
  2693.  * Side Effects:
  2694.  *    None.
  2695.  *
  2696.  *-----------------------------------------------------------------------
  2697.  */
  2698. char *
  2699. Rpc_ErrorMessage(stat)
  2700.     Rpc_Stat          stat;
  2701. {
  2702.     static char       *messages[] = {
  2703.     "Call was successful",
  2704.     "Couldn't send message",
  2705.         "Call timed out",
  2706.         "Arguments/results too big",
  2707.         "No such procedure",
  2708.         "Access denied",
  2709.         "Invalid argument(s)",
  2710.         "Remote system error",
  2711.     };
  2712.     int                  index = (int)stat;
  2713.  
  2714.     if (index > sizeof(messages)/sizeof(char *)) {
  2715.     return ("Unknown error");
  2716.     } else {
  2717.     return (messages[index]);
  2718.     }
  2719. }
  2720.  
  2721. /*-
  2722.  *-----------------------------------------------------------------------
  2723.  * Rpc_Reset --
  2724.  *    Reset the RPC system to its base startup state.
  2725.  *
  2726.  * Results:
  2727.  *    None.
  2728.  *
  2729.  * Side Effects:
  2730.  *    All streams are ignored. All servers are destroyed. All events
  2731.  *    are deleted. All calls are terminated.
  2732.  *
  2733.  *-----------------------------------------------------------------------
  2734.  */
  2735. void
  2736. Rpc_Reset()
  2737. {
  2738.     register int      stream;
  2739.     register RpcCall    *call;
  2740.     register RpcServer    *server;
  2741.     register RpcEvent    *event;
  2742.  
  2743.     FD_ZERO(&rpc_readMask);
  2744.     FD_ZERO(&rpc_writeMask);
  2745.     FD_ZERO(&rpc_exceptMask);
  2746.  
  2747.     for (stream = 0; stream < FD_SETSIZE; stream++) {
  2748.     streams[stream].state = 0;
  2749.     for (call = rpcCalls[stream];
  2750.          call != (RpcCall *)0;
  2751.          call = call->next) {
  2752.          call->replied = True;
  2753.          call->status = RPC_TIMEDOUT;
  2754.     }
  2755.     for (server = rpcServers[stream];
  2756.          server != (RpcServer *)0;
  2757.          server = server->next) {
  2758.          RpcCacheDestroy(server);
  2759.          free((char *)server);
  2760.     }
  2761.     }
  2762.  
  2763.     for (event = events; event != (RpcEvent *)0; event = events) {
  2764.     events = event->next;
  2765.     free((char *)event);
  2766.     }
  2767. }
  2768.  
  2769. /*-
  2770.  *-----------------------------------------------------------------------
  2771.  * Rpc_IsLocal --
  2772.  *    See if passed address originated with the local machine.
  2773.  *    
  2774.  * Results:
  2775.  *    True if it did, False if it didn't
  2776.  *    
  2777.  *
  2778.  * Side Effects:
  2779.  *    None
  2780.  *    
  2781.  *-----------------------------------------------------------------------
  2782.  */
  2783. Boolean
  2784. Rpc_IsLocal(addrPtr)
  2785.     struct sockaddr_in    *addrPtr;
  2786. {
  2787.     static struct sockaddr_in    locals[MAXNETS];
  2788.     static int                    numLocals = 0;
  2789.     int                            i;
  2790.  
  2791.     if (numLocals == 0) {
  2792.     /*
  2793.      * Need to find all the local address. Do this only once.
  2794.      * Note that the socket we use needn't be bound to anything --
  2795.      * it's just a focal point for an ioctl or two.
  2796.      */
  2797.     struct ifconf     ifc;        /* Record of all known network
  2798.                      * interfaces */
  2799.     struct ifreq      ifreq,        /* Current network interface */
  2800.             *ifr;        /* Pointer into ifc of current interface */
  2801.     int            n;          /* Number of interfaces left to check */
  2802.     char        buf[1024];  /* Buffer for fetching interface info */
  2803.     int         s = socket(AF_INET, SOCK_DGRAM, 0);
  2804.  
  2805.     ifc.ifc_len = sizeof(buf);
  2806.     ifc.ifc_buf = buf;
  2807.  
  2808.     if (ioctl(s, SIOCGIFCONF, (char *)&ifc) >= 0) {
  2809.         /*
  2810.          * First fetch info for all the networks known
  2811.          */
  2812.         ifr = ifc.ifc_req;
  2813.  
  2814.         /*
  2815.          * Step through each network known, looking for those that are up.
  2816.          */
  2817.         for (n = ifc.ifc_len/sizeof (struct ifreq);
  2818.          (numLocals < MAXNETS) && (n > 0);
  2819.          n--, ifr++)
  2820.         {
  2821.         /*
  2822.          * Copy so we can mangle the address to get the interface flags
  2823.          */
  2824.         ifreq = *ifr;
  2825.  
  2826.         /*
  2827.          * Find the state of the interface
  2828.          */
  2829.         if (ioctl(s, SIOCGIFFLAGS, (char *)&ifreq) < 0) {
  2830.             if (rpcDebug) {
  2831.             perror("Rpc_IsLocal: ioctl (get interface flags)");
  2832.             }
  2833.             continue;
  2834.         }
  2835.  
  2836.         if ((ifreq.ifr_flags & IFF_UP) &&
  2837.             (ifr->ifr_addr.sa_family == AF_INET))
  2838.         {
  2839.             /*
  2840.              * Good stuff, Maynard. It's up and using INET addressing
  2841.              */
  2842.             locals[numLocals] = *(struct sockaddr_in *)&ifr->ifr_addr;
  2843.             numLocals++;
  2844.         }
  2845.         }
  2846.     }
  2847.  
  2848.     (void)close(s);
  2849.     }
  2850.  
  2851.     /*
  2852.      * Check all known local addresses against the passed one, returning
  2853.      * True on first match. This handles localhost too, since we've got
  2854.      * the address for interface lo0.
  2855.      */
  2856.     for (i = 0; i < numLocals; i++) {
  2857.     if (addrPtr->sin_addr.s_addr == locals[i].sin_addr.s_addr) {
  2858.         return(True);
  2859.     }
  2860.     }
  2861.     /*
  2862.      * T'ain't one of ours.
  2863.      */
  2864.     return(False);
  2865. }
  2866. /*-
  2867.  *-----------------------------------------------------------------------
  2868.  * Rpc_SwapLong --
  2869.  *    Swap a single longword.
  2870.  *
  2871.  * Results:
  2872.  *    None.
  2873.  *
  2874.  * Side Effects:
  2875.  *    The longword is overwritten.
  2876.  *
  2877.  *-----------------------------------------------------------------------
  2878.  */
  2879. void
  2880. Rpc_SwapLong (length, data)
  2881.     int        length;    /* Length of data (UNUSED) */
  2882.     long    *data;    /* Pointer to long to be swapped */
  2883. {
  2884.     union {
  2885.     unsigned char    bytes[4];
  2886.     unsigned long    l;
  2887.     }    swap;
  2888.     register unsigned char *cp = (unsigned char *)data;
  2889.  
  2890.     swap.bytes[0] = cp[3];
  2891.     swap.bytes[1] = cp[2];
  2892.     swap.bytes[2] = cp[1];
  2893.     swap.bytes[3] = cp[0];
  2894.     *data = swap.l;
  2895. }
  2896.  
  2897. /*-
  2898.  *-----------------------------------------------------------------------
  2899.  * Rpc_SwapShort --
  2900.  *    Swap a single short word (two bytes).
  2901.  *
  2902.  * Results:
  2903.  *    None.
  2904.  *
  2905.  * Side Effects:
  2906.  *    The shortword is overwritten.
  2907.  *
  2908.  *-----------------------------------------------------------------------
  2909.  */
  2910. /*ARGSUSED*/
  2911. void
  2912. Rpc_SwapShort (length, data)
  2913.     int                  length;
  2914.     unsigned short    *data;
  2915. {
  2916.     union {
  2917.     unsigned char    bytes[2];
  2918.     unsigned short    s;
  2919.     }        swap;
  2920.     register unsigned char *cp;
  2921.  
  2922.     cp = (unsigned char *)data;
  2923.     swap.bytes[0] = cp[1];
  2924.     swap.bytes[1] = cp[0];
  2925.     *data = swap.s;
  2926. }
  2927.